Source code for nni.algorithms.hpo.gridsearch_tuner

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""
Grid search tuner for hyper-parameter optimization.

For categorical parameters this tuner fully explore all combinations.
For numerical parameters it samples them at progressively decreased intervals.

Use this tuner if you have abundant resource and want to find strictly optimal parameters.

Grid search tuner has no argument.
"""

__all__ = ['GridSearchTuner']

import logging
import math

import numpy as np
from scipy.special import erfinv  # pylint: disable=no-name-in-module

import nni
from nni.common.hpo_utils import ParameterSpec, deformat_parameters, format_search_space
from nni.tuner import Tuner

_logger = logging.getLogger('nni.tuner.gridsearch')

##
# Grid search is a simple algorithm if only categorical parameters are considered.
# But to support continuous space, things get tricky.
#
# To support continuous space, we divide search process into "epochs".
# The first epoch only explores middle point of uniform and normal parameters.
# When first epoch is fully explored, the algorithm starts second epoch,
# where it divides non-categorical spaces by adding quartile points into the grid.
# Then in third epoch it adds [1/8, 3/8, 5/8, 7/8], and so on.
#
# We divide normal distributed spaces using inverse function of CDF.
# For example the 1/4 point of a normal distribution is defined as X where `normal_cdf(X) = 1/4`.
#
# Here is an example:
#
#   search space:
#     x: choices(5, 7)
#     y: normal(0, 1)
#     z: quniform(2, 3, 1)
#
#   grid of first epoch:
#     x: [5, 7]
#     y: [1/2]
#     z: [1/2]  (results in [2], because round(2.5) == 2)
#   generated parameters:
#     (5,0,2) (7,0,2)
#
#   grid of second epoch:
#     x: [5, 7]
#     y: [1/2, 1/4, 3/4]  (results in [0, -0.67, 0.67])
#     z: [1/2, 3/4]  (results in [2, 3], 1/4 is eliminated due to duplication)
#   generated parameters:
#     (5,0,3)    (5,-0.67,2) (5,-0.67,3)    (5,0.67,2) (5,0.67,3)
#     (7,0,3)    (7,-0.67,2) (7,-0.67,3)    (7,0.67,2) (7,0.67,3)
##

[docs]class GridSearchTuner(Tuner): def __init__(self): self.space = None # the grid to search in this epoch # when the space is fully explored, grid is set to None self.grid = None # list[int | float] # a paremter set is internally expressed as a vector # for each dimension i, self.vector[i] is the parameter's index in self.grid[i] # in second epoch of above example, vector [1, 2, 0] means parameters {x: 7, y: 0.67, z: 2} self.vector = None # list[int] # this tells which parameters are derived from previous epoch # in second epoch of above example, epoch_bar is [2, 1, 1] self.epoch_bar = None # list[int] # this stores which intervals are possibly divisible (low < high after log and q) # in first epoch of above example, divisions are: # {1: [(0,1/2), (1/2,1)], 2: [(1/2,1)]} # in second epoch: # {1: [(0,1/4), (1/4,1/2), (1/2,3/4), (3/4,1)], 2: [(1/2,3/4)]} # and in third epoch: # {1: [(0,1/8), ..., (7/8,1)], 2: []} self.divisions = {} # dict[int, list[tuple[float, float]]] # dumped JSON string of all tried parameters self.history = set()
[docs] def update_search_space(self, space): self.space = format_search_space(space) if not self.space: # the tuner will crash in this case, report it explicitly raise ValueError('Search space is empty') self._init_grid()
[docs] def generate_parameters(self, *args, **kwargs): while True: params = self._suggest() if params is None: raise nni.NoMoreTrialError('Search space fully explored') params = deformat_parameters(params, self.space) params_str = nni.dump(params, sort_keys=True) if params_str not in self.history: self.history.add(params_str) return params
[docs] def receive_trial_result(self, *args, **kwargs): pass
[docs] def import_data(self, data): # TODO # use tuple to dedup in case of order/precision issue causes matching failed # and remove `epoch_bar` to use uniform dedup mechanism for trial in data: params_str = nni.dump(trial['parameter'], sort_keys=True) self.history.add(params_str)
def _suggest(self): # returns next parameter set, or None if the space is already fully explored while True: if self.grid is None: # search space fully explored return None self._next_vector() if self.vector is None: # epoch end, update grid and retry self._next_grid() continue old = all((self.vector[i] < self.epoch_bar[i]) for i in range(len(self.space))) if old: # already explored in past epochs continue # this vector is valid, stop _logger.debug(f'vector: {self.vector}') return self._current_parameters() def _next_vector(self): # iterate to next vector of this epoch, set vector to None if epoch end if self.vector is None: # first vector in this epoch self.vector = [0] * len(self.space) return # deal with nested choice, don't touch nested spaces that are not chosen by current vector activated_dims = [] params = self._current_parameters() for i, spec in enumerate(self.space.values()): if spec.is_activated_in(params): activated_dims.append(i) for i in reversed(activated_dims): if self.vector[i] + 1 < len(self.grid[i]): self.vector[i] += 1 return else: self.vector[i] = 0 self.vector = None # the loop ends without returning, no more vector in this epoch def _next_grid(self): # update grid information (grid, epoch_bar, divisions) for next epoch updated = False for i, spec in enumerate(self.space.values()): self.epoch_bar[i] = len(self.grid[i]) if not spec.categorical: # further divide intervals new_vals = [] # values to append to grid new_divs = [] # sub-intervals for l, r in self.divisions[i]: mid = (l + r) / 2 diff_l = _less(l, mid, spec) diff_r = _less(mid, r, spec) if diff_l and diff_r: # we can skip these for non-q, but it will complicate the code new_vals.append(mid) updated = True if diff_l: new_divs.append((l, mid)) if diff_r: new_divs.append((mid, r)) self.grid[i] += new_vals self.divisions[i] = new_divs if not updated: # fully explored _logger.info('Search space has been fully explored') self.grid = None else: size = _grid_size_info(self.grid) _logger.info(f'Grid subdivided, new size: {size}') def _init_grid(self): self.epoch_bar = [0 for _ in self.space] self.grid = [None for _ in self.space] for i, spec in enumerate(self.space.values()): if spec.categorical: self.grid[i] = list(range(spec.size)) else: self.grid[i] = [0.5] self.divisions[i] = [] if _less(0, 0.5, spec): self.divisions[i].append((0, 0.5)) if _less(0.5, 1, spec): self.divisions[i].append((0.5, 1)) size = _grid_size_info(self.grid) _logger.info(f'Grid initialized, size: {size}') def _current_parameters(self): # convert self.vector to "formatted" parameters params = {} for i, spec in enumerate(self.space.values()): if spec.is_activated_in(params): x = self.grid[i][self.vector[i]] if spec.categorical: params[spec.key] = x else: params[spec.key] = _cdf_inverse(x, spec) return params
def _less(x, y, spec): #if spec.q is None: # TODO: comment out because of edge case UT uniform(99.9, 99.9) # return x < y real_x = _deformat_single_parameter(_cdf_inverse(x, spec), spec) real_y = _deformat_single_parameter(_cdf_inverse(y, spec), spec) return real_x < real_y def _cdf_inverse(x, spec): # inverse function of spec's cumulative distribution function if spec.normal_distributed: return spec.mu + spec.sigma * math.sqrt(2) * erfinv(2 * x - 1) else: return spec.low + (spec.high - spec.low) * x def _deformat_single_parameter(x, spec): if math.isinf(x): return x spec_dict = spec._asdict() spec_dict['key'] = (spec.name,) spec = ParameterSpec(**spec_dict) params = deformat_parameters({spec.key: x}, {spec.key: spec}) return params[spec.name] def _grid_size_info(grid): if len(grid) == 1: return str(len(grid[0])) sizes = [len(candidates) for candidates in grid] mul = '×'.join(str(s) for s in sizes) total = np.prod(sizes) return f'({mul}) = {total}'