# nni.algorithms.hpo.gridsearch_tuner 源代码

```# Copyright (c) Microsoft Corporation.

"""
Grid search tuner.

For categorical parameters this tuner fully explore all combinations.
For numerical parameters it samples them at progressively decreased intervals.
"""

__all__ = ['GridSearchTuner']

import logging
import math

import numpy as np
from scipy.special import erfinv  # pylint: disable=no-name-in-module

import nni
from nni.common.hpo_utils import ParameterSpec, deformat_parameters, format_search_space
from nni.tuner import Tuner

_logger = logging.getLogger('nni.tuner.gridsearch')

##
# Grid search is a simple algorithm if only categorical parameters are considered.
# But to support continuous space, things get tricky.
#
# To support continuous space, we divide search process into "epochs".
# The first epoch only explores middle point of uniform and normal parameters.
# When first epoch is fully explored, the algorithm starts second epoch,
# where it divides non-categorical spaces by adding quartile points into the grid.
# Then in third epoch it adds [1/8, 3/8, 5/8, 7/8], and so on.
#
# We divide normal distributed spaces using inverse function of CDF.
# For example the 1/4 point of a normal distribution is defined as X where `normal_cdf(X) = 1/4`.
#
# Here is an example:
#
#   search space:
#     x: choices(5, 7)
#     y: normal(0, 1)
#     z: quniform(2, 3, 1)
#
#   grid of first epoch:
#     x: [5, 7]
#     y: [1/2]
#     z: [1/2]  (results in , because round(2.5) == 2)
#   generated parameters:
#     (5,0,2) (7,0,2)
#
#   grid of second epoch:
#     x: [5, 7]
#     y: [1/2, 1/4, 3/4]  (results in [0, -0.67, 0.67])
#     z: [1/2, 3/4]  (results in [2, 3], 1/4 is eliminated due to duplication)
#   generated parameters:
#     (5,0,3)    (5,-0.67,2) (5,-0.67,3)    (5,0.67,2) (5,0.67,3)
#     (7,0,3)    (7,-0.67,2) (7,-0.67,3)    (7,0.67,2) (7,0.67,3)
##

[文档]class GridSearchTuner(Tuner):
"""
Grid search tuner divides search space into evenly spaced grid, and performs brute-force traverse.

Recommended when the search space is small, or if you want to find strictly optimal hyperparameters.

**Implementation**

The original grid search approach performs an exhaustive search through a space consists of ``choice`` and ``randint``.

NNI's implementation extends grid search to support all search spaces types.

When the search space contains continuous parameters like ``normal`` and ``loguniform``,
grid search tuner works in following steps:

1. Divide the search space into a grid.
2. Perform an exhaustive searth through the grid.
3. Subdivide the grid into a finer-grained new grid.
4. Goto step 2, until experiment end.

As a deterministic algorithm, grid search has no argument.

Examples
--------

.. code-block::

config.tuner.name = 'GridSearch'
"""

def __init__(self, optimize_mode=None):
self.space = None

# the grid to search in this epoch
# when the space is fully explored, grid is set to None
self.grid = None  # list[int | float]

# a paremter set is internally expressed as a vector
# for each dimension i, self.vector[i] is the parameter's index in self.grid[i]
# in second epoch of above example, vector [1, 2, 0] means parameters {x: 7, y: 0.67, z: 2}
self.vector = None  # list[int]

# this tells which parameters are derived from previous epoch
# in second epoch of above example, epoch_bar is [2, 1, 1]
self.epoch_bar = None  # list[int]

# this stores which intervals are possibly divisible (low < high after log and q)
# in first epoch of above example, divisions are:
#     {1: [(0,1/2), (1/2,1)], 2: [(1/2,1)]}
# in second epoch:
#     {1: [(0,1/4), (1/4,1/2), (1/2,3/4), (3/4,1)], 2: [(1/2,3/4)]}
# and in third epoch:
#     {1: [(0,1/8), ..., (7/8,1)], 2: []}
self.divisions = {}  # dict[int, list[tuple[float, float]]]

# dumped JSON string of all tried parameters
self.history = set()

if optimize_mode is not None:
_logger.info(f'Ignored optimize_mode "{optimize_mode}"')

def update_search_space(self, space):
self.space = format_search_space(space)
if not self.space:  # the tuner will crash in this case, report it explicitly
raise ValueError('Search space is empty')
self._init_grid()

def generate_parameters(self, *args, **kwargs):
while True:
params = self._suggest()
if params is None:
raise nni.NoMoreTrialError('Search space fully explored')
params = deformat_parameters(params, self.space)

params_str = nni.dump(params, sort_keys=True)
if params_str not in self.history:
return params

pass

def import_data(self, data):
# TODO
# use tuple to dedup in case of order/precision issue causes matching failed
# and remove `epoch_bar` to use uniform dedup mechanism
for trial in data:
params_str = nni.dump(trial['parameter'], sort_keys=True)

def _suggest(self):
# returns next parameter set, or None if the space is already fully explored
while True:
if self.grid is None:  # search space fully explored
return None

self._next_vector()

if self.vector is None:  # epoch end, update grid and retry
self._next_grid()
continue

old = all((self.vector[i] < self.epoch_bar[i]) for i in range(len(self.space)))
if old:  # already explored in past epochs
continue

# this vector is valid, stop
_logger.debug(f'vector: {self.vector}')
return self._current_parameters()

def _next_vector(self):
# iterate to next vector of this epoch, set vector to None if epoch end
if self.vector is None:  # first vector in this epoch
self.vector =  * len(self.space)
return

# deal with nested choice, don't touch nested spaces that are not chosen by current vector
activated_dims = []
params = self._current_parameters()
for i, spec in enumerate(self.space.values()):
if spec.is_activated_in(params):
activated_dims.append(i)

for i in reversed(activated_dims):
if self.vector[i] + 1 < len(self.grid[i]):
self.vector[i] += 1
return
else:
self.vector[i] = 0

self.vector = None  # the loop ends without returning, no more vector in this epoch

def _next_grid(self):
# update grid information (grid, epoch_bar, divisions) for next epoch
updated = False
for i, spec in enumerate(self.space.values()):
self.epoch_bar[i] = len(self.grid[i])
if not spec.categorical:
# further divide intervals
new_vals = []  # values to append to grid
new_divs = []  # sub-intervals
for l, r in self.divisions[i]:
mid = (l + r) / 2
diff_l = _less(l, mid, spec)
diff_r = _less(mid, r, spec)
# if l != 0 and r != 1, then they are already in the grid, else they are not
# the special case is needed because for normal distribution 0 and 1 will generate infinity
if (diff_l or l == 0.0) and (diff_r or r == 1.0):
# we can skip these for non-q, but it will complicate the code
new_vals.append(mid)
updated = True
if diff_l:
new_divs.append((l, mid))
updated = (updated or l == 0.0)
if diff_r:
new_divs.append((mid, r))
updated = (updated or r == 1.0)
self.grid[i] += new_vals
self.divisions[i] = new_divs

if not updated:  # fully explored
_logger.info('Search space has been fully explored')
self.grid = None
else:
size = _grid_size_info(self.grid)
_logger.info(f'Grid subdivided, new size: {size}')

def _init_grid(self):
self.epoch_bar = [0 for _ in self.space]
self.grid = [None for _ in self.space]
for i, spec in enumerate(self.space.values()):
if spec.categorical:
self.grid[i] = list(range(spec.size))
else:
self.grid[i] = [0.5]
self.divisions[i] = []
if _less(0, 0.5, spec):
self.divisions[i].append((0, 0.5))
if _less(0.5, 1, spec):
self.divisions[i].append((0.5, 1))

size = _grid_size_info(self.grid)
_logger.info(f'Grid initialized, size: {size}')

def _current_parameters(self):
# convert self.vector to "formatted" parameters
params = {}
for i, spec in enumerate(self.space.values()):
if spec.is_activated_in(params):
x = self.grid[i][self.vector[i]]
if spec.categorical:
params[spec.key] = x
else:
params[spec.key] = _cdf_inverse(x, spec)
return params

def _less(x, y, spec):
#if spec.q is None:  # TODO: comment out because of edge case UT uniform(99.9, 99.9)
#    return x < y
real_x = _deformat_single_parameter(_cdf_inverse(x, spec), spec)
real_y = _deformat_single_parameter(_cdf_inverse(y, spec), spec)
return real_x < real_y

def _cdf_inverse(x, spec):
# inverse function of spec's cumulative distribution function
if spec.normal_distributed:
return spec.mu + spec.sigma * math.sqrt(2) * erfinv(2 * x - 1)
else:
return spec.low + (spec.high - spec.low) * x

def _deformat_single_parameter(x, spec):
if math.isinf(x):
return x
spec_dict = spec._asdict()
spec_dict['key'] = (spec.name,)
spec = ParameterSpec(**spec_dict)
params = deformat_parameters({spec.key: x}, {spec.key: spec})
return params[spec.name]

def _grid_size_info(grid):
if len(grid) == 1:
return str(len(grid))
sizes = [len(candidates) for candidates in grid]
mul = '×'.join(str(s) for s in sizes)
total = np.prod(sizes)
return f'({mul}) = {total}'
```