Source code for nni.algorithms.hpo.hyperband_advisor

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""
hyperband_advisor.py
"""

import copy
import logging
import math
import sys

import numpy as np
from schema import Schema, Optional

import nni
from nni import ClassArgsValidator
from nni.common.hpo_utils import validate_search_space
from nni.runtime.common import multi_phase_enabled
from nni.runtime.msg_dispatcher_base import MsgDispatcherBase
from nni.runtime.tuner_command_channel import CommandType
from nni.utils import NodeType, OptimizeMode, MetricType, extract_scalar_reward
from nni import parameter_expressions

_logger = logging.getLogger(__name__)

_next_parameter_id = 0
_KEY = 'TRIAL_BUDGET'
_epsilon = 1e-6


def create_parameter_id():
    """Create an id

    Returns
    -------
    int
        parameter id
    """
    global _next_parameter_id
    _next_parameter_id += 1
    return _next_parameter_id - 1


def create_bracket_parameter_id(brackets_id, brackets_curr_decay, increased_id=-1):
    """Create a full id for a specific bracket's hyperparameter configuration

    Parameters
    ----------
    brackets_id: string
        brackets id
    brackets_curr_decay:
        brackets curr decay
    increased_id: int
        increased id

    Returns
    -------
    int
        params id
    """
    if increased_id == -1:
        increased_id = str(create_parameter_id())
    params_id = '_'.join([brackets_id,
                          str(brackets_curr_decay),
                          increased_id])
    return params_id


def json2parameter(ss_spec, random_state):
    """Randomly generate values for hyperparameters from hyperparameter space i.e., x.

    Parameters
    ----------
    ss_spec:
        hyperparameter space
    random_state:
        random operator to generate random values

    Returns
    -------
    Parameter:
        Parameters in this experiment
    """
    if isinstance(ss_spec, dict):
        if NodeType.TYPE in ss_spec.keys():
            _type = ss_spec[NodeType.TYPE]
            _value = ss_spec[NodeType.VALUE]
            if _type == 'choice':
                _index = random_state.randint(len(_value))
                chosen_params = json2parameter(ss_spec[NodeType.VALUE][_index], random_state)
            else:
                chosen_params = getattr(parameter_expressions, _type)(*(_value + [random_state]))
        else:
            chosen_params = dict()
            for key in ss_spec.keys():
                chosen_params[key] = json2parameter(ss_spec[key], random_state)
    elif isinstance(ss_spec, list):
        chosen_params = list()
        for _, subspec in enumerate(ss_spec):
            chosen_params.append(json2parameter(subspec, random_state))
    else:
        chosen_params = copy.deepcopy(ss_spec)
    return chosen_params


class Bracket():
    """
    A bracket in Hyperband, all the information of a bracket is managed by an instance of this class

    Parameters
    ----------
    bracket_id: string
        The id of this bracket, usually be set as '{Hyperband index}-{SH iteration index}'
    s: int
        The current SH iteration index.
    s_max: int
        total number of SH iterations
    eta: float
        In each iteration, a complete run of sequential halving is executed. In it,
		after evaluating each configuration on the same subset size, only a fraction of
		1/eta of them 'advances' to the next round.
    R:
        the budget associated with each stage
    optimize_mode: str
        optimize mode, 'maximize' or 'minimize'
    """

    def __init__(self, bracket_id, s, s_max, eta, R, optimize_mode):
        self.bracket_id = bracket_id
        self.s = s
        self.s_max = s_max
        self.eta = eta
        self.n = math.ceil((s_max + 1) * (eta ** s) / (s + 1) - _epsilon)
        self.r = R / eta ** s
        self.i = 0
        self.hyper_configs = []  # [ {id: params}, {}, ... ]
        self.configs_perf = []  # [ {id: [seq, acc]}, {}, ... ]
        self.num_configs_to_run = []  # [ n, n, n, ... ]
        self.num_finished_configs = []  # [ n, n, n, ... ]
        self.optimize_mode = OptimizeMode(optimize_mode)
        self.no_more_trial = False

    def is_completed(self):
        """check whether this bracket has sent out all the hyperparameter configurations"""
        return self.no_more_trial

    def get_n_r(self):
        """return the values of n and r for the next round"""
        return math.floor(self.n / self.eta ** self.i + _epsilon), math.floor(self.r * self.eta ** self.i + _epsilon)

    def increase_i(self):
        """i means the ith round. Increase i by 1"""
        self.i += 1
        if self.i > self.s:
            self.no_more_trial = True

    def set_config_perf(self, i, parameter_id, seq, value):
        """update trial's latest result with its sequence number, e.g., epoch number or batch number

        Parameters
        ----------
        i: int
            the ith round
        parameter_id: int
            the id of the trial/parameter
        seq: int
            sequence number, e.g., epoch number or batch number
        value: int
            latest result with sequence number seq

        Returns
        -------
        None
        """
        if parameter_id in self.configs_perf[i]:
            if self.configs_perf[i][parameter_id][0] < seq:
                self.configs_perf[i][parameter_id] = [seq, value]
        else:
            self.configs_perf[i][parameter_id] = [seq, value]

    def inform_trial_end(self, i):
        """If the trial is finished and the corresponding round (i.e., i) has all its trials finished,
        it will choose the top k trials for the next round (i.e., i+1)

        Parameters
        ----------
        i: int
            the ith round
        """
        global _KEY
        self.num_finished_configs[i] += 1
        _logger.debug('bracket id: %d, round: %d %d, finished: %d, all: %d', self.bracket_id, self.i, i,
                      self.num_finished_configs[i], self.num_configs_to_run[i])
        if self.num_finished_configs[i] >= self.num_configs_to_run[i] \
                and self.no_more_trial is False:
            # choose candidate configs from finished configs to run in the next round
            assert self.i == i + 1
            this_round_perf = self.configs_perf[i]
            if self.optimize_mode is OptimizeMode.Maximize:
                sorted_perf = sorted(this_round_perf.items(), key=lambda kv: kv[1][1], reverse=True)  # reverse
            else:
                sorted_perf = sorted(this_round_perf.items(), key=lambda kv: kv[1][1])
            _logger.debug('bracket %s next round %s, sorted hyper configs: %s', self.bracket_id, self.i, sorted_perf)
            next_n, next_r = self.get_n_r()
            _logger.debug('bracket %s next round %s, next_n=%d, next_r=%d', self.bracket_id, self.i, next_n, next_r)
            hyper_configs = dict()
            for k in range(next_n):
                params_id = sorted_perf[k][0]
                params = self.hyper_configs[i][params_id]
                params[_KEY] = next_r  # modify r
                # generate new id
                increased_id = params_id.split('_')[-1]
                new_id = create_bracket_parameter_id(self.bracket_id, self.i, increased_id)
                hyper_configs[new_id] = params
            self._record_hyper_configs(hyper_configs)
            return [[key, value] for key, value in hyper_configs.items()]
        return None

    def get_hyperparameter_configurations(self, num, r, searchspace_json, random_state):
        """Randomly generate num hyperparameter configurations from search space

        Parameters
        ----------
        num: int
            the number of hyperparameter configurations

        Returns
        -------
        list
            a list of hyperparameter configurations. Format: [[key1, value1], [key2, value2], ...]
        """
        global _KEY
        assert self.i == 0
        hyperparameter_configs = dict()
        for _ in range(num):
            params_id = create_bracket_parameter_id(self.bracket_id, self.i)
            params = json2parameter(searchspace_json, random_state)
            params[_KEY] = r
            hyperparameter_configs[params_id] = params
        self._record_hyper_configs(hyperparameter_configs)
        return [[key, value] for key, value in hyperparameter_configs.items()]

    def _record_hyper_configs(self, hyper_configs):
        """after generating one round of hyperconfigs, this function records the generated hyperconfigs,
        creates a dict to record the performance when those hyperconifgs are running, set the number of finished configs
        in this round to be 0, and increase the round number.

        Parameters
        ----------
        hyper_configs: list
            the generated hyperconfigs
        """
        self.hyper_configs.append(hyper_configs)
        self.configs_perf.append(dict())
        self.num_finished_configs.append(0)
        self.num_configs_to_run.append(len(hyper_configs))
        self.increase_i()

class HyperbandClassArgsValidator(ClassArgsValidator):
    def validate_class_args(self, **kwargs):
        Schema({
            'optimize_mode': self.choices('optimize_mode', 'maximize', 'minimize'),
            Optional('exec_mode'): self.choices('exec_mode', 'serial', 'parallelism'),
            Optional('R'): int,
            Optional('eta'): int
        }).validate(kwargs)

[docs] class Hyperband(MsgDispatcherBase): """ `Hyperband <https://arxiv.org/pdf/1603.06560.pdf>`__ is a multi-fidelity hyperparameter tuning algorithm based on successive halving. The basic idea of Hyperband is to create several buckets, each having ``n`` randomly generated hyperparameter configurations, each configuration using ``r`` resources (e.g., epoch number, batch number). After the ``n`` configurations are finished, it chooses the top ``n/eta`` configurations and runs them using increased ``r*eta`` resources. At last, it chooses the best configuration it has found so far. Please refer to the paper :footcite:t:`li2017hyperband` for detailed algorithm. Examples -------- .. code-block:: config.tuner.name = 'Hyperband' config.tuner.class_args = { 'optimize_mode': 'maximize', 'R': 60, 'eta': 3 } Note that once you use Advisor, you are not allowed to add a Tuner and Assessor spec in the config file. When Hyperband is used, the dict returned by :func:`nni.get_next_parameter` one more key called ``TRIAL_BUDGET`` besides the hyperparameters and their values. **With this TRIAL_BUDGET, users can control in trial code how long a trial runs by following the suggested trial budget from Hyperband.** ``TRIAL_BUDGET`` is a relative number, users can interpret them as number of epochs, number of mini-batches, running time, etc. Here is a concrete example of ``R=81`` and ``eta=3``: .. list-table:: :header-rows: 1 :widths: auto * - - s=4 - s=3 - s=2 - s=1 - s=0 * - i - n r - n r - n r - n r - n r * - 0 - 81 1 - 27 3 - 9 9 - 6 27 - 5 81 * - 1 - 27 3 - 9 9 - 3 27 - 2 81 - * - 2 - 9 9 - 3 27 - 1 81 - - * - 3 - 3 27 - 1 81 - - - * - 4 - 1 81 - - - - ``s`` means bucket, ``n`` means the number of configurations that are generated, the corresponding ``r`` means how many budgets these configurations run. ``i`` means round, for example, bucket 4 has 5 rounds, bucket 3 has 4 rounds. A complete example can be found :githublink:`examples/trials/mnist-advisor`. Parameters ---------- optimize_mode: str Optimize mode, 'maximize' or 'minimize'. R: int The maximum amount of budget that can be allocated to a single configuration. Here, trial budget could mean the number of epochs, number of mini-batches, etc., depending on how users interpret it. Each trial should use ``TRIAL_BUDGET`` to control how long it runs. eta: int The variable that controls the proportion of configurations discarded in each round of SuccessiveHalving. ``1/eta`` configurations will survive and rerun using more budgets in each round. exec_mode: str Execution mode, 'serial' or 'parallelism'. If 'parallelism', the tuner will try to use available resources to start new bucket immediately. If 'serial', the tuner will only start new bucket after the current bucket is done. Notes ----- First, Hyperband an example of how to write an autoML algorithm based on MsgDispatcherBase, rather than based on Tuner and Assessor. Hyperband is implemented in this way because it integrates the functions of both Tuner and Assessor,thus, we call it Advisor. Second, this implementation fully leverages Hyperband's internal parallelism. Specifically, the next bucket is not started strictly after the current bucket. Instead, it starts when there are available resources. If you want to use full parallelism mode, set ``exec_mode`` to ``parallelism``. Or if you want to set ``exec_mode`` with ``serial`` according to the original algorithm. In this mode, the next bucket will start strictly after the current bucket. ``parallelism`` mode may lead to multiple unfinished buckets, in contrast, there is at most one unfinished bucket under ``serial`` mode. The advantage of ``parallelism`` mode is to make full use of resources, which may reduce the experiment duration multiple times. """ def __init__(self, optimize_mode='maximize', R=60, eta=3, exec_mode='parallelism'): """B = (s_max + 1)R""" super(Hyperband, self).__init__() self.R = R self.eta = eta self.brackets = dict() # dict of Bracket self.generated_hyper_configs = [] # all the configs waiting for run self.completed_hyper_configs = [] # all the completed configs self.s_max = math.floor(math.log(self.R, self.eta) + _epsilon) self.curr_s = self.s_max self.curr_hb = 0 self.exec_mode = exec_mode self.curr_bracket_id = None self.searchspace_json = None self.random_state = None self.optimize_mode = OptimizeMode(optimize_mode) # This is for the case that nnimanager requests trial config, but tuner cannot provide immediately. # In this case, tuner increases self.credit to issue a trial config sometime later. self.credit = 0 # record the latest parameter_id of the trial job trial_job_id. # if there is no running parameter_id, self.job_id_para_id_map[trial_job_id] == None # new trial job is added to this dict and finished trial job is removed from it. self.job_id_para_id_map = dict() def handle_initialize(self, data): """callback for initializing the advisor Parameters ---------- data: dict search space """ self.handle_update_search_space(data) self.send(CommandType.Initialized, '') def handle_request_trial_jobs(self, data): """ Parameters ---------- data: int number of trial jobs """ self.credit += data for _ in range(self.credit): self._request_one_trial_job() def _request_one_trial_job(self): ret = self._get_one_trial_job() if ret is not None: self.send(CommandType.NewTrialJob, nni.dump(ret)) self.credit -= 1 def _get_one_trial_job(self): """get one trial job, i.e., one hyperparameter configuration.""" if not self.generated_hyper_configs: if self.exec_mode == 'parallelism' or \ (self.exec_mode == 'serial' and (self.curr_bracket_id is None or self.brackets[self.curr_bracket_id].is_completed())): if self.curr_s < 0: self.curr_s = self.s_max self.curr_hb += 1 _logger.debug('create a new bracket, self.curr_hb=%d, self.curr_s=%d', self.curr_hb, self.curr_s) self.curr_bracket_id = '{}-{}'.format(self.curr_hb, self.curr_s) self.brackets[self.curr_bracket_id] = Bracket( self.curr_bracket_id, self.curr_s, self.s_max, self.eta, self.R, self.optimize_mode) next_n, next_r = self.brackets[self.curr_bracket_id].get_n_r() _logger.debug('new bracket, next_n=%d, next_r=%d', next_n, next_r) assert self.searchspace_json is not None and self.random_state is not None generated_hyper_configs = self.brackets[self.curr_bracket_id].get_hyperparameter_configurations(next_n, next_r, self.searchspace_json, self.random_state) self.generated_hyper_configs = generated_hyper_configs.copy() self.curr_s -= 1 else: ret = { 'parameter_id': '-1_0_0', 'parameter_source': 'algorithm', 'parameters': '' } self.send(CommandType.NoMoreTrialJobs, nni.dump(ret)) return None assert self.generated_hyper_configs params = self.generated_hyper_configs.pop(0) ret = { 'parameter_id': params[0], 'parameter_source': 'algorithm', 'parameters': params[1] } return ret def handle_update_search_space(self, data): """data: JSON object, which is search space """ validate_search_space(data) self.searchspace_json = data self.random_state = np.random.RandomState() def _handle_trial_end(self, parameter_id): """ Parameters ---------- parameter_id: parameter id of the finished config """ bracket_id, i, _ = parameter_id.split('_') hyper_configs = self.brackets[bracket_id].inform_trial_end(int(i)) if hyper_configs is not None: _logger.debug('bracket %s next round %s, hyper_configs: %s', bracket_id, i, hyper_configs) self.generated_hyper_configs = self.generated_hyper_configs + hyper_configs for _ in range(self.credit): self._request_one_trial_job() def handle_trial_end(self, data): """ Parameters ---------- data: dict() it has three keys: trial_job_id, event, hyper_params trial_job_id: the id generated by training service event: the job's state hyper_params: the hyperparameters (a string) generated and returned by tuner """ hyper_params = nni.load(data['hyper_params']) if self.is_created_in_previous_exp(hyper_params['parameter_id']): # The end of the recovered trial is ignored return self._handle_trial_end(hyper_params['parameter_id']) if data['trial_job_id'] in self.job_id_para_id_map: del self.job_id_para_id_map[data['trial_job_id']] def handle_report_metric_data(self, data): """ Parameters ---------- data: it is an object which has keys 'parameter_id', 'value', 'trial_job_id', 'type', 'sequence'. Raises ------ ValueError Data type not supported """ if self.is_created_in_previous_exp(data['parameter_id']): # do not support recovering the algorithm state return if 'value' in data: data['value'] = nni.load(data['value']) # multiphase? need to check if data['type'] == MetricType.REQUEST_PARAMETER: assert multi_phase_enabled() assert data['trial_job_id'] is not None assert data['parameter_index'] is not None assert data['trial_job_id'] in self.job_id_para_id_map self._handle_trial_end(self.job_id_para_id_map[data['trial_job_id']]) ret = self._get_one_trial_job() if data['trial_job_id'] is not None: ret['trial_job_id'] = data['trial_job_id'] if data['parameter_index'] is not None: ret['parameter_index'] = data['parameter_index'] self.job_id_para_id_map[data['trial_job_id']] = ret['parameter_id'] self.send(CommandType.SendTrialJobParameter, nni.dump(ret)) else: value = extract_scalar_reward(data['value']) bracket_id, i, _ = data['parameter_id'].split('_') # add <trial_job_id, parameter_id> to self.job_id_para_id_map here, # because when the first parameter_id is created, trial_job_id is not known yet. if data['trial_job_id'] in self.job_id_para_id_map: assert self.job_id_para_id_map[data['trial_job_id']] == data['parameter_id'] else: self.job_id_para_id_map[data['trial_job_id']] = data['parameter_id'] if data['type'] == MetricType.FINAL: # sys.maxsize indicates this value is from FINAL metric data, because data['sequence'] from FINAL metric # and PERIODICAL metric are independent, thus, not comparable. self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], sys.maxsize, value) self.completed_hyper_configs.append(data) elif data['type'] == MetricType.PERIODICAL: self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], data['sequence'], value) else: raise ValueError('Data type not supported: {}'.format(data['type'])) def handle_add_customized_trial(self, data): global _next_parameter_id # data: parameters previous_max_param_id = self.recover_parameter_id(data) _next_parameter_id = previous_max_param_id + 1 def handle_import_data(self, data): pass