Source code for nni.utils

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import copy
from enum import Enum, unique
from pathlib import Path
from schema import And

from . import parameter_expressions


@unique
class OptimizeMode(Enum):
    """Optimize Mode class

    if OptimizeMode is 'minimize', it means the tuner need to minimize the reward
    that received from Trial.

    if OptimizeMode is 'maximize', it means the tuner need to maximize the reward
    that received from Trial.
    """
    Minimize = 'minimize'
    Maximize = 'maximize'


class NodeType:
    """Node Type class
    """
    ROOT = 'root'
    TYPE = '_type'
    VALUE = '_value'
    INDEX = '_index'
    NAME = '_name'


class MetricType:
    """The types of metric data
    """
    FINAL = 'FINAL'
    PERIODICAL = 'PERIODICAL'
    REQUEST_PARAMETER = 'REQUEST_PARAMETER'


def split_index(params):
    """
    Delete index infromation from params
    """
    if isinstance(params, dict):
        if NodeType.INDEX in params.keys():
            return split_index(params[NodeType.VALUE])
        result = {}
        for key in params:
            result[key] = split_index(params[key])
        return result
    else:
        return params


def extract_scalar_reward(value, scalar_key='default'):
    """
    Extract scalar reward from trial result.

    Parameters
    ----------
    value : int, float, dict
        the reported final metric data
    scalar_key : str
        the key name that indicates the numeric number

    Raises
    ------
    RuntimeError
        Incorrect final result: the final result should be float/int,
        or a dict which has a key named "default" whose value is float/int.
    """
    if isinstance(value, (float, int)):
        reward = value
    elif isinstance(value, dict) and scalar_key in value and isinstance(value[scalar_key], (float, int)):
        reward = value[scalar_key]
    else:
        raise RuntimeError('Incorrect final result: the final result should be float/int, ' \
            'or a dict which has a key named "default" whose value is float/int.')
    return reward


def extract_scalar_history(trial_history, scalar_key='default'):
    """
    Extract scalar value from a list of intermediate results.

    Parameters
    ----------
    trial_history : list
        accumulated intermediate results of a trial
    scalar_key : str
        the key name that indicates the numeric number

    Raises
    ------
    RuntimeError
        Incorrect final result: the final result should be float/int,
        or a dict which has a key named "default" whose value is float/int.
    """
    return [extract_scalar_reward(ele, scalar_key) for ele in trial_history]


def convert_dict2tuple(value):
    """
    convert dict type to tuple to solve unhashable problem.
    NOTE: this function will change original data.
    """
    if isinstance(value, dict):
        for _keys in value:
            value[_keys] = convert_dict2tuple(value[_keys])
        return tuple(sorted(value.items()))
    return value


def json2space(x, oldy=None, name=NodeType.ROOT):
    """
    Change search space from json format to hyperopt format

    """
    y = list()
    if isinstance(x, dict):
        if NodeType.TYPE in x.keys():
            _type = x[NodeType.TYPE]
            name = name + '-' + _type
            if _type == 'choice':
                if oldy is not None:
                    _index = oldy[NodeType.INDEX]
                    y += json2space(x[NodeType.VALUE][_index],
                                    oldy[NodeType.VALUE], name=name+'[%d]' % _index)
                else:
                    y += json2space(x[NodeType.VALUE], None, name=name)
            y.append(name)
        else:
            for key in x.keys():
                y += json2space(x[key], oldy[key] if oldy else None, name+"[%s]" % str(key))
    elif isinstance(x, list):
        for i, x_i in enumerate(x):
            if isinstance(x_i, dict):
                if NodeType.NAME not in x_i.keys():
                    raise RuntimeError('\'_name\' key is not found in this nested search space.')
            y += json2space(x_i, oldy[i] if oldy else None, name + "[%d]" % i)
    return y


def json2parameter(x, is_rand, random_state, oldy=None, Rand=False, name=NodeType.ROOT):
    """
    Json to pramaters.

    """
    if isinstance(x, dict):
        if NodeType.TYPE in x.keys():
            _type = x[NodeType.TYPE]
            _value = x[NodeType.VALUE]
            name = name + '-' + _type
            Rand |= is_rand[name]
            if Rand is True:
                if _type == 'choice':
                    _index = random_state.randint(len(_value))
                    y = {
                        NodeType.INDEX: _index,
                        NodeType.VALUE: json2parameter(
                            x[NodeType.VALUE][_index],
                            is_rand,
                            random_state,
                            None,
                            Rand,
                            name=name+"[%d]" % _index
                        )
                    }
                else:
                    y = getattr(parameter_expressions, _type)(*(_value + [random_state]))
            else:
                y = copy.deepcopy(oldy)
        else:
            y = dict()
            for key in x.keys():
                y[key] = json2parameter(
                    x[key],
                    is_rand,
                    random_state,
                    oldy[key] if oldy else None,
                    Rand,
                    name + "[%s]" % str(key)
                )
    elif isinstance(x, list):
        y = list()
        for i, x_i in enumerate(x):
            if isinstance(x_i, dict):
                if NodeType.NAME not in x_i.keys():
                    raise RuntimeError('\'_name\' key is not found in this nested search space.')
            y.append(json2parameter(
                x_i,
                is_rand,
                random_state,
                oldy[i] if oldy else None,
                Rand,
                name + "[%d]" % i
            ))
    else:
        y = copy.deepcopy(x)
    return y

[docs]def merge_parameter(base_params, override_params): """ Update the parameters in ``base_params`` with ``override_params``. Can be useful to override parsed command line arguments. Parameters ---------- base_params : namespace or dict Base parameters. A key-value mapping. override_params : dict or None Parameters to override. Usually the parameters got from ``get_next_parameters()``. When it is none, nothing will happen. Returns ------- namespace or dict The updated ``base_params``. Note that ``base_params`` will be updated inplace. The return value is only for convenience. """ if override_params is None: return base_params is_dict = isinstance(base_params, dict) for k, v in override_params.items(): if is_dict: if k not in base_params: raise ValueError('Key \'%s\' not found in base parameters.' % k) if type(base_params[k]) != type(v) and base_params[k] is not None: raise TypeError('Expected \'%s\' in override parameters to have type \'%s\', but found \'%s\'.' % (k, type(base_params[k]), type(v))) base_params[k] = v else: if not hasattr(base_params, k): raise ValueError('Key \'%s\' not found in base parameters.' % k) if type(getattr(base_params, k)) != type(v) and getattr(base_params, k) is not None: raise TypeError('Expected \'%s\' in override parameters to have type \'%s\', but found \'%s\'.' % (k, type(getattr(base_params, k)), type(v))) setattr(base_params, k, v) return base_params
class ClassArgsValidator(object): """ NNI tuners/assessors/adivisors accept a `classArgs` parameter in experiment configuration file. This ClassArgsValidator interface is used to validate the classArgs section in exeperiment configuration file. """ def validate_class_args(self, **kwargs): """ Validate the classArgs configuration in experiment configuration file. Parameters ---------- kwargs: dict kwargs passed to tuner/assessor/advisor constructor Raises: Raise an execption if the kwargs is invalid. """ pass def choices(self, key, *args): """ Utility method to create a scheme to check whether the `key` is one of the `args`. Parameters: ---------- key: str key name of the data to be validated args: list of str list of the choices Returns: Schema -------- A scheme to check whether the `key` is one of the `args`. """ return And(lambda n: n in args, error='%s should be in [%s]!' % (key, str(args))) def range(self, key, keyType, start, end): """ Utility method to create a schema to check whether the `key` is in the range of [start, end]. Parameters: ---------- key: str key name of the data to be validated keyType: type python data type, such as int, float start: type is specified by keyType start of the range end: type is specified by keyType end of the range Returns: Schema -------- A scheme to check whether the `key` is in the range of [start, end]. """ return And( And(keyType, error='%s should be %s type!' % (key, keyType.__name__)), And(lambda n: start <= n <= end, error='%s should be in range of (%s, %s)!' % (key, start, end)) ) def path(self, key): return And( And(str, error='%s should be a string!' % key), And(lambda p: Path(p).exists(), error='%s path does not exist!' % (key)) )