Source code for nni.algorithms.hpo.metis_tuner.metis_tuner

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""
metis_tuner.py
"""

import copy
import logging
import random
import statistics
import warnings
from multiprocessing.dummy import Pool as ThreadPool
import numpy as np
from schema import Schema, Optional

from nni import ClassArgsValidator
from nni.tuner import Tuner
from nni.common.hpo_utils import validate_search_space
from nni.utils import OptimizeMode, extract_scalar_reward
from . import lib_constraint_summation
from . import lib_data
from .Regression_GMM import CreateModel as gmm_create_model
from .Regression_GMM import Selection as gmm_selection
from .Regression_GP import CreateModel as gp_create_model
from .Regression_GP import OutlierDetection as gp_outlier_detection
from .Regression_GP import Prediction as gp_prediction
from .Regression_GP import Selection as gp_selection

logger = logging.getLogger("Metis_Tuner_AutoML")

NONE_TYPE = ''
CONSTRAINT_LOWERBOUND = None
CONSTRAINT_UPPERBOUND = None
CONSTRAINT_PARAMS_IDX = []

class MetisClassArgsValidator(ClassArgsValidator):
    def validate_class_args(self, **kwargs):
        Schema({
            Optional('optimize_mode'): self.choices('optimize_mode', 'maximize', 'minimize'),
            Optional('no_resampling'): bool,
            Optional('no_candidates'): bool,
            Optional('selection_num_starting_points'): int,
            Optional('cold_start_num'): int,
        }).validate(kwargs)

[docs]class MetisTuner(Tuner): """ `Metis tuner <https://www.microsoft.com/en-us/research/publication/metis-robustly-tuning-tail-latencies-cloud-systems/>`__ offers several benefits over other tuning algorithms. While most tools only predict the optimal configuration, Metis gives you two outputs, a prediction for the optimal configuration and a suggestion for the next trial. No more guess work! While most tools assume training datasets do not have noisy data, Metis actually tells you if you need to resample a particular hyper-parameter. While most tools have problems of being exploitation-heavy, Metis' search strategy balances exploration, exploitation, and (optional) resampling. Metis belongs to the class of sequential model-based optimization (SMBO) algorithms and it is based on the Bayesian Optimization framework. To model the parameter-vs-performance space, Metis uses both a Gaussian Process and GMM. Since each trial can impose a high time cost, Metis heavily trades inference computations with naive trials. At each iteration, Metis does two tasks (refer to :footcite:t:`li2018metis` for details): 1. It finds the global optimal point in the Gaussian Process space. This point represents the optimal configuration. 2. It identifies the next hyper-parameter candidate. This is achieved by inferring the potential information gain of exploration, exploitation, and resampling. Note that the only acceptable types in the :doc:`search space </hpo/search_space>` are ``quniform``, ``uniform``, ``randint``, and numerical ``choice``. Examples -------- .. code-block:: config.tuner.name = 'Metis' config.tuner.class_args = { 'optimize_mode': 'maximize' } Parameters ---------- optimize_mode : str optimize_mode is a string that including two mode "maximize" and "minimize" no_resampling : bool True or False. Should Metis consider re-sampling as part of the search strategy? If you are confident that the training dataset is noise-free, then you do not need re-sampling. no_candidates : bool True or False. Should Metis suggest parameters for the next benchmark? If you do not plan to do more benchmarks, Metis can skip this step. selection_num_starting_points : int How many times Metis should try to find the global optimal in the search space? The higher the number, the longer it takes to output the solution. cold_start_num : int Metis need some trial result to get cold start. when the number of trial result is less than cold_start_num, Metis will randomly sample hyper-parameter for trial. exploration_probability: float The probability of Metis to select parameter from exploration instead of exploitation. """ def __init__( self, optimize_mode="maximize", no_resampling=True, no_candidates=False, selection_num_starting_points=600, cold_start_num=10, exploration_probability=0.9): self.samples_x = [] self.samples_y = [] self.samples_y_aggregation = [] self.total_data = [] self.space = None self.no_resampling = no_resampling self.no_candidates = no_candidates self.optimize_mode = OptimizeMode(optimize_mode) self.key_order = [] self.cold_start_num = cold_start_num self.selection_num_starting_points = selection_num_starting_points self.exploration_probability = exploration_probability self.minimize_constraints_fun = None self.minimize_starting_points = None self.supplement_data_num = 0 # The constration of parameters self.x_bounds = [] # The type of parameters self.x_types = [] def update_search_space(self, search_space): """ Update the self.x_bounds and self.x_types by the search_space.json Parameters ---------- search_space : dict """ validate_search_space(search_space, ['choice', 'randint', 'uniform', 'quniform']) self.x_bounds = [[] for i in range(len(search_space))] self.x_types = [NONE_TYPE for i in range(len(search_space))] for key in search_space: self.key_order.append(key) key_type = {} if isinstance(search_space, dict): for key in search_space: key_type = search_space[key]['_type'] key_range = search_space[key]['_value'] idx = self.key_order.index(key) if key_type == 'quniform': if key_range[2] == 1 and key_range[0].is_integer( ) and key_range[1].is_integer(): self.x_bounds[idx] = [key_range[0], key_range[1] + 1] self.x_types[idx] = 'range_int' else: low, high, q = key_range bounds = np.clip( np.arange( np.round( low / q), np.round( high / q) + 1) * q, low, high) self.x_bounds[idx] = bounds self.x_types[idx] = 'discrete_int' elif key_type == 'randint': self.x_bounds[idx] = [key_range[0], key_range[1]] self.x_types[idx] = 'range_int' elif key_type == 'uniform': self.x_bounds[idx] = [key_range[0], key_range[1]] self.x_types[idx] = 'range_continuous' elif key_type == 'choice': self.x_bounds[idx] = key_range for key_value in key_range: if not isinstance(key_value, (int, float)): raise RuntimeError( "Metis Tuner only support numerical choice.") self.x_types[idx] = 'discrete_int' else: logger.info( "Metis Tuner doesn't support this kind of variable: %s", str(key_type)) raise RuntimeError( "Metis Tuner doesn't support this kind of variable: %s" % str(key_type)) else: logger.info("The format of search space is not a dict.") raise RuntimeError("The format of search space is not a dict.") self.minimize_starting_points = _rand_init( self.x_bounds, self.x_types, self.selection_num_starting_points) def _pack_output(self, init_parameter): """ Pack the output Parameters ---------- init_parameter : dict Returns ------- output : dict """ output = {} for i, param in enumerate(init_parameter): output[self.key_order[i]] = param return output def generate_parameters(self, parameter_id, **kwargs): """ Generate next parameter for trial If the number of trial result is lower than cold start number, metis will first random generate some parameters. Otherwise, metis will choose the parameters by the Gussian Process Model and the Gussian Mixture Model. Parameters ---------- parameter_id : int Returns ------- result : dict """ if len(self.samples_x) < self.cold_start_num: init_parameter = _rand_init(self.x_bounds, self.x_types, 1)[0] results = self._pack_output(init_parameter) else: self.minimize_starting_points = _rand_init( self.x_bounds, self.x_types, self.selection_num_starting_points) results = self._selection( self.samples_x, self.samples_y_aggregation, self.samples_y, self.x_bounds, self.x_types, threshold_samplessize_resampling=( None if self.no_resampling is True else 50), no_candidates=self.no_candidates, minimize_starting_points=self.minimize_starting_points, minimize_constraints_fun=self.minimize_constraints_fun) logger.info("Generate paramageters: \n%s", str(results)) return results def receive_trial_result(self, parameter_id, parameters, value, **kwargs): """ Tuner receive result from trial. Parameters ---------- parameter_id : int The id of parameters, generated by nni manager. parameters : dict A group of parameters that trial has tried. value : dict/float if value is dict, it should have "default" key. """ value = extract_scalar_reward(value) if self.optimize_mode == OptimizeMode.Maximize: value = -value logger.info("Received trial result.") logger.info("value is : %s", str(value)) logger.info("parameter is : %s", str(parameters)) # parse parameter to sample_x sample_x = [0 for i in range(len(self.key_order))] for key in parameters: idx = self.key_order.index(key) sample_x[idx] = parameters[key] # parse value to sample_y temp_y = [] if sample_x in self.samples_x: idx = self.samples_x.index(sample_x) temp_y = self.samples_y[idx] temp_y.append(value) self.samples_y[idx] = temp_y # calculate y aggregation median = get_median(temp_y) self.samples_y_aggregation[idx] = [median] else: self.samples_x.append(sample_x) self.samples_y.append([value]) # calculate y aggregation self.samples_y_aggregation.append([value]) def _selection( self, samples_x, samples_y_aggregation, samples_y, x_bounds, x_types, max_resampling_per_x=3, threshold_samplessize_exploitation=12, threshold_samplessize_resampling=50, no_candidates=False, minimize_starting_points=None, minimize_constraints_fun=None): with warnings.catch_warnings(): warnings.simplefilter("ignore") next_candidate = None candidates = [] samples_size_all = sum([len(i) for i in samples_y]) samples_size_unique = len(samples_y) # ===== STEP 1: Compute the current optimum ===== gp_model = gp_create_model.create_model( samples_x, samples_y_aggregation) lm_current = gp_selection.selection( "lm", samples_y_aggregation, x_bounds, x_types, gp_model['model'], minimize_starting_points, minimize_constraints_fun=minimize_constraints_fun) if not lm_current: return None logger.info({ 'hyperparameter': lm_current['hyperparameter'], 'expected_mu': lm_current['expected_mu'], 'expected_sigma': lm_current['expected_sigma'], 'reason': "exploitation_gp" }) if no_candidates is False: # ===== STEP 2: Get recommended configurations for exploration ==== results_exploration = gp_selection.selection( "lc", samples_y_aggregation, x_bounds, x_types, gp_model['model'], minimize_starting_points, minimize_constraints_fun=minimize_constraints_fun) if results_exploration is not None: if _num_past_samples(results_exploration['hyperparameter'], samples_x, samples_y) == 0: temp_candidate = { 'hyperparameter': results_exploration['hyperparameter'], 'expected_mu': results_exploration['expected_mu'], 'expected_sigma': results_exploration['expected_sigma'], 'reason': "exploration" } candidates.append(temp_candidate) logger.info("DEBUG: 1 exploration candidate selected\n") logger.info(temp_candidate) else: logger.info("DEBUG: No suitable exploration candidates were") # ===== STEP 3: Get recommended configurations for exploitation === if samples_size_all >= threshold_samplessize_exploitation: logger.info("Getting candidates for exploitation...\n") try: gmm = gmm_create_model.create_model( samples_x, samples_y_aggregation) if ("discrete_int" in x_types) or ("range_int" in x_types): results_exploitation = gmm_selection.selection( x_bounds, x_types, gmm['clusteringmodel_good'], gmm['clusteringmodel_bad'], minimize_starting_points, minimize_constraints_fun=minimize_constraints_fun) else: # If all parameters are of "range_continuous", # let's use GMM to generate random starting points results_exploitation = gmm_selection.selection_r( x_bounds, x_types, gmm['clusteringmodel_good'], gmm['clusteringmodel_bad'], num_starting_points=self.selection_num_starting_points, minimize_constraints_fun=minimize_constraints_fun) if results_exploitation is not None: if _num_past_samples(results_exploitation['hyperparameter'], samples_x, samples_y) == 0: temp_expected_mu, temp_expected_sigma = \ gp_prediction.predict(results_exploitation['hyperparameter'], gp_model['model']) temp_candidate = { 'hyperparameter': results_exploitation['hyperparameter'], 'expected_mu': temp_expected_mu, 'expected_sigma': temp_expected_sigma, 'reason': "exploitation_gmm" } candidates.append(temp_candidate) logger.info( "DEBUG: 1 exploitation_gmm candidate selected\n") logger.info(temp_candidate) else: logger.info( "DEBUG: No suitable exploitation_gmm candidates were found\n") except ValueError as exception: # The exception: ValueError: Fitting the mixture model failed # because some components have ill-defined empirical covariance # (for instance caused by singleton or collapsed samples). # Try to decrease the number of components, or increase # reg_covar. logger.info( "DEBUG: No suitable exploitation_gmm \ candidates were found due to exception.") logger.info(exception) # ===== STEP 4: Get a list of outliers ===== if (threshold_samplessize_resampling is not None) and \ (samples_size_unique >= threshold_samplessize_resampling): logger.info("Getting candidates for re-sampling...\n") results_outliers = gp_outlier_detection.outlierDetection_threaded( samples_x, samples_y_aggregation) if results_outliers is not None: for results_outlier in results_outliers: # pylint: disable=not-an-iterable if _num_past_samples(samples_x[results_outlier['samples_idx']], samples_x, samples_y) < max_resampling_per_x: temp_candidate = {'hyperparameter': samples_x[results_outlier['samples_idx']],\ 'expected_mu': results_outlier['expected_mu'],\ 'expected_sigma': results_outlier['expected_sigma'],\ 'reason': "resampling"} candidates.append(temp_candidate) logger.info("DEBUG: %d re-sampling candidates selected\n") logger.info(temp_candidate) else: logger.info( "DEBUG: No suitable resampling candidates were found\n") if candidates: # ===== STEP 5: Compute the information gain of each candidate logger.info( "Evaluating information gain of %d candidates...\n") next_improvement = 0 threads_inputs = [[ candidate, samples_x, samples_y, x_bounds, x_types, minimize_constraints_fun, minimize_starting_points ] for candidate in candidates] threads_pool = ThreadPool(4) # Evaluate what would happen if we actually sample each # candidate threads_results = threads_pool.map( _calculate_lowest_mu_threaded, threads_inputs) threads_pool.close() threads_pool.join() for threads_result in threads_results: if threads_result['expected_lowest_mu'] < lm_current['expected_mu']: # Information gain temp_improvement = threads_result['expected_lowest_mu'] - \ lm_current['expected_mu'] if next_improvement > temp_improvement: next_improvement = temp_improvement next_candidate = threads_result['candidate'] else: # ===== STEP 6: If we have no candidates, randomly pick one === logger.info( "DEBUG: No candidates from exploration, exploitation,\ and resampling. We will random a candidate for next_candidate\n" ) next_candidate = _rand_with_constraints( x_bounds, x_types) if minimize_starting_points is None else minimize_starting_points[0] next_candidate = lib_data.match_val_type( next_candidate, x_bounds, x_types) expected_mu, expected_sigma = gp_prediction.predict( next_candidate, gp_model['model']) next_candidate = { 'hyperparameter': next_candidate, 'reason': "random", 'expected_mu': expected_mu, 'expected_sigma': expected_sigma} # STEP 7: If current optimal hyperparameter occurs in the history # or exploration probability is less than the threshold, take next # config as exploration step outputs = self._pack_output(lm_current['hyperparameter']) ap = random.uniform(0, 1) if outputs in self.total_data or ap <= self.exploration_probability: if next_candidate is not None: outputs = self._pack_output(next_candidate['hyperparameter']) else: random_parameter = _rand_init(x_bounds, x_types, 1)[0] outputs = self._pack_output(random_parameter) self.total_data.append(outputs) return outputs def import_data(self, data): """ Import additional data for tuning Parameters ---------- data : a list of dict each of which has at least two keys: 'parameter' and 'value'. """ _completed_num = 0 for trial_info in data: logger.info("Importing data, current processing progress %s / %s", _completed_num, len(data)) _completed_num += 1 assert "parameter" in trial_info _params = trial_info["parameter"] assert "value" in trial_info _value = trial_info['value'] if not _value: logger.info("Useless trial data, value is %s, skip this trial data.", _value) continue self.supplement_data_num += 1 _parameter_id = '_'.join( ["ImportData", str(self.supplement_data_num)]) self.total_data.append(_params) self.receive_trial_result( parameter_id=_parameter_id, parameters=_params, value=_value) logger.info("Successfully import data to metis tuner.")
def _rand_with_constraints(x_bounds, x_types): outputs = None x_bounds_withconstraints = [x_bounds[i] for i in CONSTRAINT_PARAMS_IDX] x_types_withconstraints = [x_types[i] for i in CONSTRAINT_PARAMS_IDX] x_val_withconstraints = lib_constraint_summation.rand( x_bounds_withconstraints, x_types_withconstraints, CONSTRAINT_LOWERBOUND, CONSTRAINT_UPPERBOUND) if not x_val_withconstraints: outputs = [None] * len(x_bounds) for i, _ in enumerate(CONSTRAINT_PARAMS_IDX): outputs[CONSTRAINT_PARAMS_IDX[i]] = x_val_withconstraints[i] for i, output in enumerate(outputs): if not output: outputs[i] = random.randint(x_bounds[i][0], x_bounds[i][1]) return outputs def _calculate_lowest_mu_threaded(inputs): [candidate, samples_x, samples_y, x_bounds, x_types, minimize_constraints_fun, minimize_starting_points] = inputs outputs = {"candidate": candidate, "expected_lowest_mu": None} for expected_mu in [ candidate['expected_mu'] + 1.96 * candidate['expected_sigma'], candidate['expected_mu'] - 1.96 * candidate['expected_sigma']]: temp_samples_x = copy.deepcopy(samples_x) temp_samples_y = copy.deepcopy(samples_y) try: idx = temp_samples_x.index(candidate['hyperparameter']) # This handles the case of re-sampling a potential outlier temp_samples_y[idx].append(expected_mu) except ValueError: temp_samples_x.append(candidate['hyperparameter']) temp_samples_y.append([expected_mu]) # Aggregates multiple observation of the sample sampling points temp_y_aggregation = [statistics.median( temp_sample_y) for temp_sample_y in temp_samples_y] temp_gp = gp_create_model.create_model( temp_samples_x, temp_y_aggregation) temp_results = gp_selection.selection( "lm", temp_y_aggregation, x_bounds, x_types, temp_gp['model'], minimize_starting_points, minimize_constraints_fun=minimize_constraints_fun) if outputs["expected_lowest_mu"] is None \ or outputs["expected_lowest_mu"] > temp_results['expected_mu']: outputs["expected_lowest_mu"] = temp_results['expected_mu'] return outputs def _num_past_samples(x, samples_x, samples_y): try: idx = samples_x.index(x) return len(samples_y[idx]) except ValueError: logger.info("x not in sample_x") return 0 def _rand_init(x_bounds, x_types, selection_num_starting_points): ''' Random sample some init seed within bounds. ''' return [lib_data.rand(x_bounds, x_types) for i in range(0, selection_num_starting_points)] def get_median(temp_list): """ Return median """ num = len(temp_list) temp_list.sort() print(temp_list) if num % 2 == 0: median = (temp_list[int(num / 2)] + temp_list[int(num / 2) - 1]) / 2 else: median = temp_list[int(num / 2)] return median