Source code for nni.hyperopt_tuner.hyperopt_tuner

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""
hyperopt_tuner.py
"""

import copy
import logging

import hyperopt as hp
import numpy as np
from nni.tuner import Tuner
from nni.utils import NodeType, OptimizeMode, extract_scalar_reward, split_index

logger = logging.getLogger('hyperopt_AutoML')


def json2space(in_x, name=NodeType.ROOT):
    """
    Change json to search space in hyperopt.

    Parameters
    ----------
    in_x : dict/list/str/int/float
        The part of json.
    name : str
        name could be NodeType.ROOT, NodeType.TYPE, NodeType.VALUE or NodeType.INDEX, NodeType.NAME.
    """
    out_y = copy.deepcopy(in_x)
    if isinstance(in_x, dict):
        if NodeType.TYPE in in_x.keys():
            _type = in_x[NodeType.TYPE]
            name = name + '-' + _type
            _value = json2space(in_x[NodeType.VALUE], name=name)
            if _type == 'choice':
                out_y = hp.hp.choice(name, _value)
            elif _type == 'randint':
                out_y = hp.hp.randint(name, _value[1] - _value[0])
            else:
                if _type in ['loguniform', 'qloguniform']:
                    _value[:2] = np.log(_value[:2])
                out_y = getattr(hp.hp, _type)(name, *_value)
        else:
            out_y = dict()
            for key in in_x.keys():
                out_y[key] = json2space(in_x[key], name + '[%s]' % str(key))
    elif isinstance(in_x, list):
        out_y = list()
        for i, x_i in enumerate(in_x):
            if isinstance(x_i, dict):
                if NodeType.NAME not in x_i.keys():
                    raise RuntimeError(
                        '\'_name\' key is not found in this nested search space.'
                    )
            out_y.append(json2space(x_i, name + '[%d]' % i))
    return out_y


def json2parameter(in_x, parameter, name=NodeType.ROOT):
    """
    Change json to parameters.
    """
    out_y = copy.deepcopy(in_x)
    if isinstance(in_x, dict):
        if NodeType.TYPE in in_x.keys():
            _type = in_x[NodeType.TYPE]
            name = name + '-' + _type
            if _type == 'choice':
                _index = parameter[name]
                out_y = {
                    NodeType.INDEX:
                    _index,
                    NodeType.VALUE:
                    json2parameter(in_x[NodeType.VALUE][_index],
                                   parameter,
                                   name=name + '[%d]' % _index)
                }
            else:
                if _type in ['quniform', 'qloguniform']:
                    out_y = np.clip(parameter[name], in_x[NodeType.VALUE][0], in_x[NodeType.VALUE][1])
                elif _type == 'randint':
                    out_y = parameter[name] + in_x[NodeType.VALUE][0]
                else:
                    out_y = parameter[name]
        else:
            out_y = dict()
            for key in in_x.keys():
                out_y[key] = json2parameter(in_x[key], parameter,
                                            name + '[%s]' % str(key))
    elif isinstance(in_x, list):
        out_y = list()
        for i, x_i in enumerate(in_x):
            if isinstance(x_i, dict):
                if NodeType.NAME not in x_i.keys():
                    raise RuntimeError(
                        '\'_name\' key is not found in this nested search space.'
                    )
            out_y.append(json2parameter(x_i, parameter, name + '[%d]' % i))
    return out_y


def json2vals(in_x, vals, out_y, name=NodeType.ROOT):
    if isinstance(in_x, dict):
        if NodeType.TYPE in in_x.keys():
            _type = in_x[NodeType.TYPE]
            name = name + '-' + _type

            try:
                out_y[name] = vals[NodeType.INDEX]
            # TODO - catch exact Exception
            except Exception:
                out_y[name] = vals

            if _type == 'choice':
                _index = vals[NodeType.INDEX]
                json2vals(in_x[NodeType.VALUE][_index],
                          vals[NodeType.VALUE],
                          out_y,
                          name=name + '[%d]' % _index)
            if _type == 'randint':
                out_y[name] -= in_x[NodeType.VALUE][0]
        else:
            for key in in_x.keys():
                json2vals(in_x[key], vals[key], out_y,
                          name + '[%s]' % str(key))
    elif isinstance(in_x, list):
        for i, temp in enumerate(in_x):
            # nested json
            if isinstance(temp, dict):
                if NodeType.NAME not in temp.keys():
                    raise RuntimeError(
                        '\'_name\' key is not found in this nested search space.'
                    )
                else:
                    json2vals(temp, vals[i], out_y, name + '[%d]' % i)
            else:
                json2vals(temp, vals[i], out_y, name + '[%d]' % i)


def _add_index(in_x, parameter):
    """
    change parameters in NNI format to parameters in hyperopt format(This function also support nested dict.).
    For example, receive parameters like:
        {'dropout_rate': 0.8, 'conv_size': 3, 'hidden_size': 512}
    Will change to format in hyperopt, like:
        {'dropout_rate': 0.8, 'conv_size': {'_index': 1, '_value': 3}, 'hidden_size': {'_index': 1, '_value': 512}}
    """
    if NodeType.TYPE not in in_x: # if at the top level
        out_y = dict()
        for key, value in parameter.items():
            out_y[key] = _add_index(in_x[key], value)
        return out_y
    elif isinstance(in_x, dict):
        value_type = in_x[NodeType.TYPE]
        value_format = in_x[NodeType.VALUE]
        if value_type == "choice":
            choice_name = parameter[0] if isinstance(parameter,
                                                     list) else parameter
            for pos, item in enumerate(
                    value_format):  # here value_format is a list
                if isinstance(
                        item,
                        list):  # this format is ["choice_key", format_dict]
                    choice_key = item[0]
                    choice_value_format = item[1]
                    if choice_key == choice_name:
                        return {
                            NodeType.INDEX: pos,
                            NodeType.VALUE: [
                                choice_name,
                                _add_index(choice_value_format, parameter[1])
                            ]
                        }
                elif choice_name == item:
                    return {NodeType.INDEX: pos, NodeType.VALUE: item}
        else:
            return parameter
    return None  # note: this is not written by original author, feel free to modify if you think it's incorrect


[docs]class HyperoptTuner(Tuner): """ HyperoptTuner is a tuner which using hyperopt algorithm. """ def __init__(self, algorithm_name, optimize_mode='minimize', parallel_optimize=False, constant_liar_type='min'): """ Parameters ---------- algorithm_name : str algorithm_name includes "tpe", "random_search" and anneal". optimize_mode : str parallel_optimize : bool More detail could reference: docs/en_US/Tuner/HyperoptTuner.md constant_liar_type : str constant_liar_type including "min", "max" and "mean" More detail could reference: docs/en_US/Tuner/HyperoptTuner.md """ self.algorithm_name = algorithm_name self.optimize_mode = OptimizeMode(optimize_mode) self.json = None self.total_data = {} self.rval = None self.supplement_data_num = 0 self.parallel = parallel_optimize if self.parallel: self.CL_rval = None self.constant_liar_type = constant_liar_type self.running_data = [] self.optimal_y = None def _choose_tuner(self, algorithm_name): """ Parameters ---------- algorithm_name : str algorithm_name includes "tpe", "random_search" and anneal" """ if algorithm_name == 'tpe': return hp.tpe.suggest if algorithm_name == 'random_search': return hp.rand.suggest if algorithm_name == 'anneal': return hp.anneal.suggest raise RuntimeError('Not support tuner algorithm in hyperopt.')
[docs] def update_search_space(self, search_space): """ Update search space definition in tuner by search_space in parameters. Will called when first setup experiemnt or update search space in WebUI. Parameters ---------- search_space : dict """ self.json = search_space search_space_instance = json2space(self.json) rstate = np.random.RandomState() trials = hp.Trials() domain = hp.Domain(None, search_space_instance, pass_expr_memo_ctrl=None) algorithm = self._choose_tuner(self.algorithm_name) self.rval = hp.FMinIter(algorithm, domain, trials, max_evals=-1, rstate=rstate, verbose=0) self.rval.catch_eval_exceptions = False
[docs] def generate_parameters(self, parameter_id, **kwargs): """ Returns a set of trial (hyper-)parameters, as a serializable object. Parameters ---------- parameter_id : int Returns ------- params : dict """ total_params = self.get_suggestion(random_search=False) # avoid generating same parameter with concurrent trials because hyperopt doesn't support parallel mode if total_params in self.total_data.values(): # but it can cause duplicate parameter rarely total_params = self.get_suggestion(random_search=True) self.total_data[parameter_id] = total_params if self.parallel: self.running_data.append(parameter_id) params = split_index(total_params) return params
[docs] def receive_trial_result(self, parameter_id, parameters, value, **kwargs): """ Record an observation of the objective function Parameters ---------- parameter_id : int parameters : dict value : dict/float if value is dict, it should have "default" key. value is final metrics of the trial. """ reward = extract_scalar_reward(value) # restore the paramsters contains '_index' if parameter_id not in self.total_data: raise RuntimeError('Received parameter_id not in total_data.') params = self.total_data[parameter_id] # code for parallel if self.parallel: constant_liar = kwargs.get('constant_liar', False) if constant_liar: rval = self.CL_rval else: rval = self.rval # ignore duplicated reported final result (due to aware of intermedate result) if parameter_id not in self.running_data: logger.info("Received duplicated final result with parameter id: %s", parameter_id) return self.running_data.remove(parameter_id) # update the reward of optimal_y if self.optimal_y is None: if self.constant_liar_type == 'mean': self.optimal_y = [reward, 1] else: self.optimal_y = reward else: if self.constant_liar_type == 'mean': _sum = self.optimal_y[0] + reward _number = self.optimal_y[1] + 1 self.optimal_y = [_sum, _number] elif self.constant_liar_type == 'min': self.optimal_y = min(self.optimal_y, reward) elif self.constant_liar_type == 'max': self.optimal_y = max(self.optimal_y, reward) logger.debug("Update optimal_y with reward, optimal_y = %s", self.optimal_y) else: rval = self.rval if self.optimize_mode is OptimizeMode.Maximize: reward = -reward domain = rval.domain trials = rval.trials new_id = len(trials) rval_specs = [None] rval_results = [domain.new_result()] rval_miscs = [dict(tid=new_id, cmd=domain.cmd, workdir=domain.workdir)] vals = params idxs = dict() out_y = dict() json2vals(self.json, vals, out_y) vals = out_y for key in domain.params: if key in [NodeType.VALUE, NodeType.INDEX]: continue if key not in vals or vals[key] is None or vals[key] == []: idxs[key] = vals[key] = [] else: idxs[key] = [new_id] vals[key] = [vals[key]] self.miscs_update_idxs_vals(rval_miscs, idxs, vals, idxs_map={new_id: new_id}, assert_all_vals_used=False) trial = trials.new_trial_docs([new_id], rval_specs, rval_results, rval_miscs)[0] trial['result'] = {'loss': reward, 'status': 'ok'} trial['state'] = hp.JOB_STATE_DONE trials.insert_trial_docs([trial]) trials.refresh()
[docs] def miscs_update_idxs_vals(self, miscs, idxs, vals, assert_all_vals_used=True, idxs_map=None): """ Unpack the idxs-vals format into the list of dictionaries that is `misc`. Parameters ---------- idxs_map : dict idxs_map is a dictionary of id->id mappings so that the misc['idxs'] can contain different numbers than the idxs argument. """ if idxs_map is None: idxs_map = {} assert set(idxs.keys()) == set(vals.keys()) misc_by_id = {m['tid']: m for m in miscs} for m in miscs: m['idxs'] = {key: [] for key in idxs} m['vals'] = {key: [] for key in idxs} for key in idxs: assert len(idxs[key]) == len(vals[key]) for tid, val in zip(idxs[key], vals[key]): tid = idxs_map.get(tid, tid) if assert_all_vals_used or tid in misc_by_id: misc_by_id[tid]['idxs'][key] = [tid] misc_by_id[tid]['vals'][key] = [val]
[docs] def get_suggestion(self, random_search=False): """ get suggestion from hyperopt Parameters ---------- random_search : bool flag to indicate random search or not (default: {False}) Returns ---------- total_params : dict parameter suggestion """ if self.parallel and len(self.total_data) > 20 and self.running_data and self.optimal_y is not None: self.CL_rval = copy.deepcopy(self.rval) if self.constant_liar_type == 'mean': _constant_liar_y = self.optimal_y[0] / self.optimal_y[1] else: _constant_liar_y = self.optimal_y for _parameter_id in self.running_data: self.receive_trial_result(parameter_id=_parameter_id, parameters=None, value=_constant_liar_y, constant_liar=True) rval = self.CL_rval random_state = np.random.randint(2**31 - 1) else: rval = self.rval random_state = rval.rstate.randint(2**31 - 1) trials = rval.trials algorithm = rval.algo new_ids = rval.trials.new_trial_ids(1) rval.trials.refresh() if random_search: new_trials = hp.rand.suggest(new_ids, rval.domain, trials, random_state) else: new_trials = algorithm(new_ids, rval.domain, trials, random_state) rval.trials.refresh() vals = new_trials[0]['misc']['vals'] parameter = dict() for key in vals: try: parameter[key] = vals[key][0].item() except (KeyError, IndexError): parameter[key] = None # remove '_index' from json2parameter and save params-id total_params = json2parameter(self.json, parameter) return total_params
[docs] def import_data(self, data): """ Import additional data for tuning Parameters ---------- data: a list of dictionarys, each of which has at least two keys, 'parameter' and 'value' """ _completed_num = 0 for trial_info in data: logger.info("Importing data, current processing progress %s / %s", _completed_num, len(data)) _completed_num += 1 if self.algorithm_name == 'random_search': return assert "parameter" in trial_info _params = trial_info["parameter"] assert "value" in trial_info _value = trial_info['value'] if not _value: logger.info("Useless trial data, value is %s, skip this trial data.", _value) continue self.supplement_data_num += 1 _parameter_id = '_'.join( ["ImportData", str(self.supplement_data_num)]) self.total_data[_parameter_id] = _add_index(in_x=self.json, parameter=_params) self.receive_trial_result(parameter_id=_parameter_id, parameters=_params, value=_value) logger.info("Successfully import data to TPE/Anneal tuner.")