# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""
ppo_tuner.py including:
class PPOTuner
"""
import copy
import logging
import numpy as np
from gym import spaces
from schema import Schema, Optional
import nni
from nni import ClassArgsValidator
from nni.tuner import Tuner
from nni.utils import OptimizeMode, extract_scalar_reward
from .model import Model
from .util import set_global_seeds
from .policy import build_lstm_policy
logger = logging.getLogger('ppo_tuner_AutoML')
def _constfn(val):
"""
Wrap as function
"""
def f(_):
return val
return f
class ModelConfig:
"""
Configurations of the PPO model
"""
def __init__(self):
self.observation_space = None
self.action_space = None
self.num_envs = 0
self.nsteps = 0
self.ent_coef = 0.0
self.lr = 3e-4
self.vf_coef = 0.5
self.max_grad_norm = 0.5
self.gamma = 0.99
self.lam = 0.95
self.cliprange = 0.2
self.embedding_size = None # the embedding is for each action
self.noptepochs = 4 # number of training epochs per update
self.total_timesteps = 5000 # number of timesteps (i.e. number of actions taken in the environment)
self.nminibatches = 4 # number of training minibatches per update. For recurrent policies,
# should be smaller or equal than number of environments run in parallel.
class TrialsInfo:
"""
Informations of each trial from one model inference
"""
def __init__(self, obs, actions, values, neglogpacs, dones, last_value, inf_batch_size):
self.iter = 0
self.obs = obs
self.actions = actions
self.values = values
self.neglogpacs = neglogpacs
self.dones = dones
self.last_value = last_value
self.rewards = None
self.returns = None
self.inf_batch_size = inf_batch_size
#self.states = None
def get_next(self):
"""
Get actions of the next trial
"""
if self.iter >= self.inf_batch_size:
return None, None
actions = []
for step in self.actions:
actions.append(step[self.iter])
self.iter += 1
return self.iter - 1, actions
def update_rewards(self, rewards, returns):
"""
After the trial is finished, reward and return of this trial is updated
"""
self.rewards = rewards
self.returns = returns
def convert_shape(self):
"""
Convert shape
"""
def sf01(arr):
"""
swap and then flatten axes 0 and 1
"""
s = arr.shape
return arr.swapaxes(0, 1).reshape(s[0] * s[1], *s[2:])
self.obs = sf01(self.obs)
self.returns = sf01(self.returns)
self.dones = sf01(self.dones)
self.actions = sf01(self.actions)
self.values = sf01(self.values)
self.neglogpacs = sf01(self.neglogpacs)
class PPOModel:
"""
PPO Model
"""
def __init__(self, model_config, mask):
self.model_config = model_config
self.states = None # initial state of lstm in policy/value network
self.nupdates = None # the number of func train is invoked, used to tune lr and cliprange
self.cur_update = 1 # record the current update
self.np_mask = mask # record the mask of each action within one trial
set_global_seeds(None)
assert isinstance(self.model_config.lr, float)
self.lr = _constfn(self.model_config.lr)
assert isinstance(self.model_config.cliprange, float)
self.cliprange = _constfn(self.model_config.cliprange)
# build lstm policy network, value share the same network
policy = build_lstm_policy(model_config)
# Get the nb of env
nenvs = model_config.num_envs
# Calculate the batch_size
self.nbatch = nbatch = nenvs * model_config.nsteps # num of record per update
nbatch_train = nbatch // model_config.nminibatches # get batch size
# self.nupdates is used to tune lr and cliprange
self.nupdates = self.model_config.total_timesteps // self.nbatch
# Instantiate the model object (that creates act_model and train_model)
self.model = Model(policy=policy, nbatch_act=nenvs, nbatch_train=nbatch_train,
nsteps=model_config.nsteps, ent_coef=model_config.ent_coef, vf_coef=model_config.vf_coef,
max_grad_norm=model_config.max_grad_norm, np_mask=self.np_mask)
self.states = self.model.initial_state
logger.info('=== finished PPOModel initialization')
def inference(self, num):
"""
Generate actions along with related info from policy network.
observation is the action of the last step.
Parameters
----------
num: int
The number of trials to generate
Returns
-------
mb_obs : list
Observation of the ``num`` configurations
mb_actions : list
Actions of the ``num`` configurations
mb_values : list
Values from the value function of the ``num`` configurations
mb_neglogpacs : list
``neglogp`` of the ``num`` configurations
mb_dones : list
To show whether the play is done, always ``True``
last_values : tensorflow tensor
The last values of the ``num`` configurations, got with session run
"""
# Here, we init the lists that will contain the mb of experiences
mb_obs, mb_actions, mb_values, mb_dones, mb_neglogpacs = [], [], [], [], []
# initial observation
# use the (n+1)th embedding to represent the first step action
first_step_ob = self.model_config.action_space.n
obs = [first_step_ob for _ in range(num)]
dones = [True for _ in range(num)]
states = self.states
# For n in range number of steps
for cur_step in range(self.model_config.nsteps):
# Given observations, get action value and neglopacs
# We already have self.obs because Runner superclass run self.obs[:] = env.reset() on init
actions, values, states, neglogpacs = self.model.step(cur_step, obs, S=states, M=dones)
mb_obs.append(obs.copy())
mb_actions.append(actions)
mb_values.append(values)
mb_neglogpacs.append(neglogpacs)
mb_dones.append(dones)
# Take actions in env and look the results
# Infos contains a ton of useful informations
obs[:] = actions
if cur_step == self.model_config.nsteps - 1:
dones = [True for _ in range(num)]
else:
dones = [False for _ in range(num)]
#batch of steps to batch of rollouts
np_obs = np.asarray(obs)
mb_obs = np.asarray(mb_obs, dtype=np_obs.dtype)
mb_actions = np.asarray(mb_actions)
mb_values = np.asarray(mb_values, dtype=np.float32)
mb_neglogpacs = np.asarray(mb_neglogpacs, dtype=np.float32)
mb_dones = np.asarray(mb_dones, dtype=np.bool)
last_values = self.model.value(np_obs, S=states, M=dones)
return mb_obs, mb_actions, mb_values, mb_neglogpacs, mb_dones, last_values
def compute_rewards(self, trials_info, trials_result):
"""
Compute the rewards of the trials in trials_info based on trials_result,
and update the rewards in trials_info
Parameters
----------
trials_info : TrialsInfo
Info of the generated trials
trials_result : list
Final results (e.g., acc) of the generated trials
"""
mb_rewards = np.asarray([trials_result for _ in trials_info.actions], dtype=np.float32)
# discount/bootstrap off value fn
mb_returns = np.zeros_like(mb_rewards)
mb_advs = np.zeros_like(mb_rewards)
lastgaelam = 0
last_dones = np.asarray([True for _ in trials_result], dtype=np.bool) # ugly
for t in reversed(range(self.model_config.nsteps)):
if t == self.model_config.nsteps - 1:
nextnonterminal = 1.0 - last_dones
nextvalues = trials_info.last_value
else:
nextnonterminal = 1.0 - trials_info.dones[t+1]
nextvalues = trials_info.values[t+1]
delta = mb_rewards[t] + self.model_config.gamma * nextvalues * nextnonterminal - trials_info.values[t]
lastgaelam = delta + self.model_config.gamma * self.model_config.lam * nextnonterminal * lastgaelam
mb_advs[t] = lastgaelam # pylint: disable=unsupported-assignment-operation
mb_returns = mb_advs + trials_info.values
trials_info.update_rewards(mb_rewards, mb_returns)
trials_info.convert_shape()
def train(self, trials_info, nenvs):
"""
Train the policy/value network using trials_info
Parameters
----------
trials_info : TrialsInfo
Complete info of the generated trials from the previous inference
nenvs : int
The batch size of the (previous) inference
"""
# keep frac decay for future optimization
if self.cur_update <= self.nupdates:
frac = 1.0 - (self.cur_update - 1.0) / self.nupdates
else:
logger.warning('current update (self.cur_update) %d has exceeded total updates (self.nupdates) %d',
self.cur_update, self.nupdates)
frac = 1.0 - (self.nupdates - 1.0) / self.nupdates
lrnow = self.lr(frac)
cliprangenow = self.cliprange(frac)
self.cur_update += 1
states = self.states
assert states is not None # recurrent version
assert nenvs % self.model_config.nminibatches == 0
envsperbatch = nenvs // self.model_config.nminibatches
envinds = np.arange(nenvs)
flatinds = np.arange(nenvs * self.model_config.nsteps).reshape(nenvs, self.model_config.nsteps)
for _ in range(self.model_config.noptepochs):
np.random.shuffle(envinds)
for start in range(0, nenvs, envsperbatch):
end = start + envsperbatch
mbenvinds = envinds[start:end]
mbflatinds = flatinds[mbenvinds].ravel()
slices = (arr[mbflatinds] for arr in (trials_info.obs, trials_info.returns, trials_info.dones,
trials_info.actions, trials_info.values, trials_info.neglogpacs))
mbstates = states[mbenvinds]
self.model.train(lrnow, cliprangenow, *slices, mbstates)
class PPOClassArgsValidator(ClassArgsValidator):
def validate_class_args(self, **kwargs):
Schema({
'optimize_mode': self.choices('optimize_mode', 'maximize', 'minimize'),
Optional('trials_per_update'): self.range('trials_per_update', int, 0, 99999),
Optional('epochs_per_update'): self.range('epochs_per_update', int, 0, 99999),
Optional('minibatch_size'): self.range('minibatch_size', int, 0, 99999),
Optional('ent_coef'): float,
Optional('lr'): float,
Optional('vf_coef'): float,
Optional('max_grad_norm'): float,
Optional('gamma'): float,
Optional('lam'): float,
Optional('cliprange'): float,
}).validate(kwargs)
[文档]class PPOTuner(Tuner):
"""
PPOTuner, the implementation inherits the main logic of the implementation
`ppo2 from openai <https://github.com/openai/baselines/tree/master/baselines/ppo2>`__ and is adapted for NAS scenario.
It uses ``lstm`` for its policy network and value network, policy and value share the same network.
Parameters
----------
optimize_mode : str
maximize or minimize
trials_per_update : int
Number of trials to have for each model update
epochs_per_update : int
Number of epochs to run for each model update
minibatch_size : int
Minibatch size (number of trials) for the update
ent_coef : float
Policy entropy coefficient in the optimization objective
lr : float
Learning rate of the model (lstm network), constant
vf_coef : float
Value function loss coefficient in the optimization objective
max_grad_norm : float
Gradient norm clipping coefficient
gamma : float
Discounting factor
lam : float
Advantage estimation discounting factor (lambda in the paper)
cliprange : float
Cliprange in the PPO algorithm, constant
"""
def __init__(self, optimize_mode, trials_per_update=20, epochs_per_update=4, minibatch_size=4,
ent_coef=0.0, lr=3e-4, vf_coef=0.5, max_grad_norm=0.5, gamma=0.99, lam=0.95, cliprange=0.2):
self.optimize_mode = OptimizeMode(optimize_mode)
self.model_config = ModelConfig()
self.model = None
self.search_space = None
self.running_trials = {} # key: parameter_id, value: actions/states/etc.
self.inf_batch_size = trials_per_update # number of trials to generate in one inference
self.first_inf = True # indicate whether it is the first time to inference new trials
self.trials_result = [None for _ in range(self.inf_batch_size)] # results of finished trials
self.credit = 0 # record the unsatisfied trial requests
self.param_ids = []
self.finished_trials = 0
self.chosen_arch_template = {}
self.actions_spaces = None
self.actions_to_config = None
self.full_act_space = None
self.trials_info = None
self.all_trials = {} # used to dedup the same trial, key: config, value: final result
self.model_config.num_envs = self.inf_batch_size
self.model_config.noptepochs = epochs_per_update
self.model_config.nminibatches = minibatch_size
self.send_trial_callback = None
logger.info('Finished PPOTuner initialization')
def _process_nas_space(self, search_space):
actions_spaces = []
actions_to_config = []
for key, val in search_space.items():
if val['_type'] == 'layer_choice':
actions_to_config.append((key, 'layer_choice'))
actions_spaces.append(val['_value'])
self.chosen_arch_template[key] = None
elif val['_type'] == 'input_choice':
candidates = val['_value']['candidates']
n_chosen = val['_value']['n_chosen']
if n_chosen not in [0, 1, [0, 1]]:
raise ValueError('Optional_input_size can only be 0, 1, or [0, 1], but the pecified one is %s'
% (n_chosen))
if isinstance(n_chosen, list):
actions_to_config.append((key, 'input_choice'))
# FIXME: risk, candidates might also have None
actions_spaces.append(['None', *candidates])
self.chosen_arch_template[key] = None
elif n_chosen == 1:
actions_to_config.append((key, 'input_choice'))
actions_spaces.append(candidates)
self.chosen_arch_template[key] = None
elif n_chosen == 0:
self.chosen_arch_template[key] = []
else:
raise ValueError('Unsupported search space type: %s' % (val['_type']))
# calculate observation space
dedup = {}
for step in actions_spaces:
for action in step:
dedup[action] = 1
full_act_space = [act for act, _ in dedup.items()]
assert len(full_act_space) == len(dedup)
observation_space = len(full_act_space)
nsteps = len(actions_spaces)
return actions_spaces, actions_to_config, full_act_space, observation_space, nsteps
def _generate_action_mask(self):
"""
Different step could have different action space. to deal with this case, we merge all the
possible actions into one action space, and use mask to indicate available actions for each step
"""
two_masks = []
mask = []
for acts in self.actions_spaces:
one_mask = [0 for _ in range(len(self.full_act_space))]
for act in acts:
idx = self.full_act_space.index(act)
one_mask[idx] = 1
mask.append(one_mask)
two_masks.append(mask)
mask = []
for acts in self.actions_spaces:
one_mask = [-np.inf for _ in range(len(self.full_act_space))]
for act in acts:
idx = self.full_act_space.index(act)
one_mask[idx] = 0
mask.append(one_mask)
two_masks.append(mask)
return np.asarray(two_masks, dtype=np.float32)
def update_search_space(self, search_space):
"""
Get search space, currently the space only includes that for NAS
Parameters
----------
search_space : dict
Search space for NAS
the format could be referred to search space spec (https://nni.readthedocs.io/en/latest/Tutorial/SearchSpaceSpec.html).
"""
logger.info('update search space %s', search_space)
assert self.search_space is None
self.search_space = search_space
assert self.model_config.observation_space is None
assert self.model_config.action_space is None
self.actions_spaces, self.actions_to_config, self.full_act_space, obs_space, nsteps = self._process_nas_space(search_space)
self.model_config.observation_space = spaces.Discrete(obs_space)
self.model_config.action_space = spaces.Discrete(obs_space)
self.model_config.nsteps = nsteps
# generate mask in numpy
mask = self._generate_action_mask()
assert self.model is None
self.model = PPOModel(self.model_config, mask)
def _actions_to_config(self, actions):
"""
Given actions, to generate the corresponding trial configuration
"""
chosen_arch = copy.deepcopy(self.chosen_arch_template)
for cnt, act in enumerate(actions):
act_name = self.full_act_space[act]
(_key, _type) = self.actions_to_config[cnt]
if _type == 'input_choice':
if act_name == 'None':
chosen_arch[_key] = {'_value': [], '_idx': []}
else:
candidates = self.search_space[_key]['_value']['candidates']
idx = candidates.index(act_name)
chosen_arch[_key] = {'_value': [act_name], '_idx': [idx]}
elif _type == 'layer_choice':
idx = self.search_space[_key]['_value'].index(act_name)
chosen_arch[_key] = {'_value': act_name, '_idx': idx}
else:
raise ValueError('unrecognized key: {0}'.format(_type))
return chosen_arch
def generate_multiple_parameters(self, parameter_id_list, **kwargs):
"""
Returns multiple sets of trial (hyper-)parameters, as iterable of serializable objects.
Parameters
----------
parameter_id_list : list of int
Unique identifiers for each set of requested hyper-parameters.
These will later be used in :meth:`receive_trial_result`.
**kwargs
Not used
Returns
-------
list
A list of newly generated configurations
"""
result = []
self.send_trial_callback = kwargs['st_callback']
for parameter_id in parameter_id_list:
had_exception = False
try:
logger.debug("generating param for %s", parameter_id)
res = self.generate_parameters(parameter_id, **kwargs)
except nni.NoMoreTrialError:
had_exception = True
if not had_exception:
result.append(res)
return result
def generate_parameters(self, parameter_id, **kwargs):
"""
Generate parameters, if no trial configration for now, self.credit plus 1 to send the config later
Parameters
----------
parameter_id : int
Unique identifier for requested hyper-parameters.
This will later be used in :meth:`receive_trial_result`.
**kwargs
Not used
Returns
-------
dict
One newly generated configuration
"""
if self.first_inf:
self.trials_result = [None for _ in range(self.inf_batch_size)]
mb_obs, mb_actions, mb_values, mb_neglogpacs, mb_dones, last_values = self.model.inference(self.inf_batch_size)
self.trials_info = TrialsInfo(mb_obs, mb_actions, mb_values, mb_neglogpacs,
mb_dones, last_values, self.inf_batch_size)
self.first_inf = False
trial_info_idx, actions = self.trials_info.get_next()
if trial_info_idx is None:
logger.debug('Credit added by one in parameters request')
self.credit += 1
self.param_ids.append(parameter_id)
raise nni.NoMoreTrialError('no more parameters now.')
self.running_trials[parameter_id] = trial_info_idx
new_config = self._actions_to_config(actions)
return new_config
def _next_round_inference(self):
"""
Run a inference to generate next batch of configurations
"""
logger.debug('Start next round inference...')
self.finished_trials = 0
self.model.compute_rewards(self.trials_info, self.trials_result)
self.model.train(self.trials_info, self.inf_batch_size)
self.running_trials = {}
# generate new trials
self.trials_result = [None for _ in range(self.inf_batch_size)]
mb_obs, mb_actions, mb_values, mb_neglogpacs, mb_dones, last_values = self.model.inference(self.inf_batch_size)
self.trials_info = TrialsInfo(mb_obs, mb_actions,
mb_values, mb_neglogpacs,
mb_dones, last_values,
self.inf_batch_size)
logger.debug('Next round inference complete.')
# check credit and submit new trials
for _ in range(self.credit):
trial_info_idx, actions = self.trials_info.get_next()
if trial_info_idx is None:
logger.warning('No enough trial config, trials_per_update is suggested to be larger than trialConcurrency')
break
assert self.param_ids
param_id = self.param_ids.pop()
self.running_trials[param_id] = trial_info_idx
new_config = self._actions_to_config(actions)
self.send_trial_callback(param_id, new_config)
self.credit -= 1
logger.debug('Send new trial (%d, %s) for reducing credit', param_id, new_config)
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
"""
Receive trial's result. if the number of finished trials equals self.inf_batch_size, start the next update to
train the model.
Parameters
----------
parameter_id : int
Unique identifier of used hyper-parameters, same with :meth:`generate_parameters`.
parameters : dict
Hyper-parameters generated by :meth:`generate_parameters`.
value : dict
Result from trial (the return value of :func:`nni.report_final_result`).
"""
trial_info_idx = self.running_trials.pop(parameter_id, None)
assert trial_info_idx is not None
value = extract_scalar_reward(value)
if self.optimize_mode == OptimizeMode.Minimize:
value = -value
self.trials_result[trial_info_idx] = value
self.finished_trials += 1
logger.debug('receive_trial_result, parameter_id %d, trial_info_idx %d, finished_trials %d, inf_batch_size %d',
parameter_id, trial_info_idx, self.finished_trials, self.inf_batch_size)
if self.finished_trials == self.inf_batch_size:
logger.debug('Start next round inference in receive_trial_result')
self._next_round_inference()
def trial_end(self, parameter_id, success, **kwargs):
"""
To deal with trial failure. If a trial fails, it is popped out from ``self.running_trials``,
and the final result of this trial is assigned with the average of the finished trials.
Parameters
----------
parameter_id : int
Unique identifier for hyper-parameters used by this trial.
success : bool
True if the trial successfully completed; False if failed or terminated.
**kwargs
Not used
"""
if not success:
if parameter_id not in self.running_trials:
logger.warning('The trial is failed, but self.running_trial does not have this trial')
return
trial_info_idx = self.running_trials.pop(parameter_id, None)
assert trial_info_idx is not None
# use mean of finished trials as the result of this failed trial
values = [val for val in self.trials_result if val is not None]
logger.warning('In trial_end, values: %s', values)
self.trials_result[trial_info_idx] = (np.mean(values)) if values else 0
self.finished_trials += 1
if self.finished_trials == self.inf_batch_size:
logger.debug('Start next round inference in trial_end')
self._next_round_inference()
def import_data(self, data):
"""
Import additional data for tuning, not supported yet.
Parameters
----------
data : list
A list of dictionarys, each of which has at least two keys, ``parameter`` and ``value``
"""
logger.warning('PPOTuner cannot leverage imported data.')