# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""
Grid search tuner.
For categorical parameters this tuner fully explore all combinations.
For numerical parameters it samples them at progressively decreased intervals.
"""
__all__ = ['GridSearchTuner']
import logging
import math
import numpy as np
from scipy.special import erfinv # pylint: disable=no-name-in-module
import nni
from nni.common.hpo_utils import ParameterSpec, deformat_parameters, format_search_space
from nni.tuner import Tuner
_logger = logging.getLogger('nni.tuner.gridsearch')
##
# Grid search is a simple algorithm if only categorical parameters are considered.
# But to support continuous space, things get tricky.
#
# To support continuous space, we divide search process into "epochs".
# The first epoch only explores middle point of uniform and normal parameters.
# When first epoch is fully explored, the algorithm starts second epoch,
# where it divides non-categorical spaces by adding quartile points into the grid.
# Then in third epoch it adds [1/8, 3/8, 5/8, 7/8], and so on.
#
# We divide normal distributed spaces using inverse function of CDF.
# For example the 1/4 point of a normal distribution is defined as X where `normal_cdf(X) = 1/4`.
#
# Here is an example:
#
# search space:
# x: choices(5, 7)
# y: normal(0, 1)
# z: quniform(2, 3, 1)
#
# grid of first epoch:
# x: [5, 7]
# y: [1/2]
# z: [1/2] (results in [2], because round(2.5) == 2)
# generated parameters:
# (5,0,2) (7,0,2)
#
# grid of second epoch:
# x: [5, 7]
# y: [1/2, 1/4, 3/4] (results in [0, -0.67, 0.67])
# z: [1/2, 3/4] (results in [2, 3], 1/4 is eliminated due to duplication)
# generated parameters:
# (5,0,3) (5,-0.67,2) (5,-0.67,3) (5,0.67,2) (5,0.67,3)
# (7,0,3) (7,-0.67,2) (7,-0.67,3) (7,0.67,2) (7,0.67,3)
##
[文档]class GridSearchTuner(Tuner):
"""
Grid search tuner divides search space into evenly spaced grid, and performs brute-force traverse.
Recommended when the search space is small, or if you want to find strictly optimal hyperparameters.
**Implementation**
The original grid search approach performs an exhaustive search through a space consists of ``choice`` and ``randint``.
NNI's implementation extends grid search to support all search spaces types.
When the search space contains continuous parameters like ``normal`` and ``loguniform``,
grid search tuner works in following steps:
1. Divide the search space into a grid.
2. Perform an exhaustive searth through the grid.
3. Subdivide the grid into a finer-grained new grid.
4. Goto step 2, until experiment end.
As a deterministic algorithm, grid search has no argument.
Examples
--------
.. code-block::
config.tuner.name = 'GridSearch'
"""
def __init__(self, optimize_mode=None):
self.space = None
# the grid to search in this epoch
# when the space is fully explored, grid is set to None
self.grid = None # list[int | float]
# a paremter set is internally expressed as a vector
# for each dimension i, self.vector[i] is the parameter's index in self.grid[i]
# in second epoch of above example, vector [1, 2, 0] means parameters {x: 7, y: 0.67, z: 2}
self.vector = None # list[int]
# this tells which parameters are derived from previous epoch
# in second epoch of above example, epoch_bar is [2, 1, 1]
self.epoch_bar = None # list[int]
# this stores which intervals are possibly divisible (low < high after log and q)
# in first epoch of above example, divisions are:
# {1: [(0,1/2), (1/2,1)], 2: [(1/2,1)]}
# in second epoch:
# {1: [(0,1/4), (1/4,1/2), (1/2,3/4), (3/4,1)], 2: [(1/2,3/4)]}
# and in third epoch:
# {1: [(0,1/8), ..., (7/8,1)], 2: []}
self.divisions = {} # dict[int, list[tuple[float, float]]]
# dumped JSON string of all tried parameters
self.history = set()
if optimize_mode is not None:
_logger.info(f'Ignored optimize_mode "{optimize_mode}"')
def update_search_space(self, space):
self.space = format_search_space(space)
if not self.space: # the tuner will crash in this case, report it explicitly
raise ValueError('Search space is empty')
self._init_grid()
def generate_parameters(self, *args, **kwargs):
while True:
params = self._suggest()
if params is None:
raise nni.NoMoreTrialError('Search space fully explored')
params = deformat_parameters(params, self.space)
params_str = nni.dump(params, sort_keys=True)
if params_str not in self.history:
self.history.add(params_str)
return params
def receive_trial_result(self, *args, **kwargs):
pass
def import_data(self, data):
# TODO
# use tuple to dedup in case of order/precision issue causes matching failed
# and remove `epoch_bar` to use uniform dedup mechanism
for trial in data:
params_str = nni.dump(trial['parameter'], sort_keys=True)
self.history.add(params_str)
def _suggest(self):
# returns next parameter set, or None if the space is already fully explored
while True:
if self.grid is None: # search space fully explored
return None
self._next_vector()
if self.vector is None: # epoch end, update grid and retry
self._next_grid()
continue
old = all((self.vector[i] < self.epoch_bar[i]) for i in range(len(self.space)))
if old: # already explored in past epochs
continue
# this vector is valid, stop
_logger.debug(f'vector: {self.vector}')
return self._current_parameters()
def _next_vector(self):
# iterate to next vector of this epoch, set vector to None if epoch end
if self.vector is None: # first vector in this epoch
self.vector = [0] * len(self.space)
return
# deal with nested choice, don't touch nested spaces that are not chosen by current vector
activated_dims = []
params = self._current_parameters()
for i, spec in enumerate(self.space.values()):
if spec.is_activated_in(params):
activated_dims.append(i)
for i in reversed(activated_dims):
if self.vector[i] + 1 < len(self.grid[i]):
self.vector[i] += 1
return
else:
self.vector[i] = 0
self.vector = None # the loop ends without returning, no more vector in this epoch
def _next_grid(self):
# update grid information (grid, epoch_bar, divisions) for next epoch
updated = False
for i, spec in enumerate(self.space.values()):
self.epoch_bar[i] = len(self.grid[i])
if not spec.categorical:
# further divide intervals
new_vals = [] # values to append to grid
new_divs = [] # sub-intervals
for l, r in self.divisions[i]:
mid = (l + r) / 2
diff_l = _less(l, mid, spec)
diff_r = _less(mid, r, spec)
# if l != 0 and r != 1, then they are already in the grid, else they are not
# the special case is needed because for normal distribution 0 and 1 will generate infinity
if (diff_l or l == 0.0) and (diff_r or r == 1.0):
# we can skip these for non-q, but it will complicate the code
new_vals.append(mid)
updated = True
if diff_l:
new_divs.append((l, mid))
updated = (updated or l == 0.0)
if diff_r:
new_divs.append((mid, r))
updated = (updated or r == 1.0)
self.grid[i] += new_vals
self.divisions[i] = new_divs
if not updated: # fully explored
_logger.info('Search space has been fully explored')
self.grid = None
else:
size = _grid_size_info(self.grid)
_logger.info(f'Grid subdivided, new size: {size}')
def _init_grid(self):
self.epoch_bar = [0 for _ in self.space]
self.grid = [None for _ in self.space]
for i, spec in enumerate(self.space.values()):
if spec.categorical:
self.grid[i] = list(range(spec.size))
else:
self.grid[i] = [0.5]
self.divisions[i] = []
if _less(0, 0.5, spec):
self.divisions[i].append((0, 0.5))
if _less(0.5, 1, spec):
self.divisions[i].append((0.5, 1))
size = _grid_size_info(self.grid)
_logger.info(f'Grid initialized, size: {size}')
def _current_parameters(self):
# convert self.vector to "formatted" parameters
params = {}
for i, spec in enumerate(self.space.values()):
if spec.is_activated_in(params):
x = self.grid[i][self.vector[i]]
if spec.categorical:
params[spec.key] = x
else:
params[spec.key] = _cdf_inverse(x, spec)
return params
def _less(x, y, spec):
#if spec.q is None: # TODO: comment out because of edge case UT uniform(99.9, 99.9)
# return x < y
real_x = _deformat_single_parameter(_cdf_inverse(x, spec), spec)
real_y = _deformat_single_parameter(_cdf_inverse(y, spec), spec)
return real_x < real_y
def _cdf_inverse(x, spec):
# inverse function of spec's cumulative distribution function
if spec.normal_distributed:
return spec.mu + spec.sigma * math.sqrt(2) * erfinv(2 * x - 1)
else:
return spec.low + (spec.high - spec.low) * x
def _deformat_single_parameter(x, spec):
if math.isinf(x):
return x
spec_dict = spec._asdict()
spec_dict['key'] = (spec.name,)
spec = ParameterSpec(**spec_dict)
params = deformat_parameters({spec.key: x}, {spec.key: spec})
return params[spec.name]
def _grid_size_info(grid):
if len(grid) == 1:
return str(len(grid[0]))
sizes = [len(candidates) for candidates in grid]
mul = '×'.join(str(s) for s in sizes)
total = np.prod(sizes)
return f'({mul}) = {total}'