nni.nas.hub.pytorch.proxylessnas 源代码

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import math
from typing import Optional, Callable, List, Tuple, Iterator, Union, cast, overload

import torch
from torch import nn
from nni.mutable import MutableExpression
from nni.nas.nn.pytorch import ModelSpace, LayerChoice, Repeat, MutableConv2d, MutableLinear, MutableBatchNorm2d

from .utils.pretrained import load_pretrained_weight

MaybeIntChoice = Union[int, MutableExpression[int]]


@overload
def make_divisible(v: Union[int, float], divisor, min_val=None) -> int:
    ...


@overload
def make_divisible(v: Union[MutableExpression[int], MutableExpression[float]], divisor, min_val=None) -> MutableExpression[int]:
    ...


def make_divisible(v: Union[MutableExpression[int], MutableExpression[float], int, float], divisor, min_val=None) -> MaybeIntChoice:
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    """
    if min_val is None:
        min_val = divisor
    # This should work for both value choices and constants.
    new_v = MutableExpression.max(min_val, round(v + divisor // 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    return MutableExpression.condition(new_v < 0.9 * v, new_v + divisor, new_v)


def simplify_sequential(sequentials: List[nn.Module]) -> Iterator[nn.Module]:
    """
    Flatten the sequential blocks so that the hierarchy looks better.
    Eliminate identity modules automatically.
    """
    for module in sequentials:
        if isinstance(module, nn.Sequential):
            for submodule in module.children():
                # no recursive expansion
                if not isinstance(submodule, nn.Identity):
                    yield submodule
        else:
            if not isinstance(module, nn.Identity):
                yield module


class ConvBNReLU(nn.Sequential):
    """
    The template for a conv-bn-relu block.
    """

    def __init__(
        self,
        in_channels: MaybeIntChoice,
        out_channels: MaybeIntChoice,
        kernel_size: MaybeIntChoice = 3,
        stride: int = 1,
        groups: MaybeIntChoice = 1,
        norm_layer: Optional[Callable[[int], nn.Module]] = None,
        activation_layer: Optional[Callable[..., nn.Module]] = None,
        dilation: int = 1,
    ) -> None:
        padding = (kernel_size - 1) // 2 * dilation
        if norm_layer is None:
            norm_layer = MutableBatchNorm2d
        if activation_layer is None:
            activation_layer = nn.ReLU6
        # If no normalization is used, set bias to True
        # https://github.com/google-research/google-research/blob/20736344/tunas/rematlib/mobile_model_v3.py#L194
        norm = norm_layer(cast(int, out_channels))
        no_normalization = isinstance(norm, nn.Identity)
        blocks: List[nn.Module] = [
            MutableConv2d(
                cast(int, in_channels),
                cast(int, out_channels),
                cast(int, kernel_size),
                stride,
                cast(int, padding),
                dilation=dilation,
                groups=cast(int, groups),
                bias=no_normalization
            ),
            # Normalization, regardless of batchnorm or identity
            norm,
            # One pytorch implementation as an SE here, to faithfully reproduce paper
            # We follow a more accepted approach to put SE outside
            # Reference: https://github.com/d-li14/mobilenetv3.pytorch/issues/18
            activation_layer(inplace=True)
        ]

        super().__init__(*simplify_sequential(blocks))


class DepthwiseSeparableConv(nn.Sequential):
    """
    In the original MobileNetV2 implementation, this is InvertedResidual when expand ratio = 1.
    Residual connection is added if input and output shape are the same.

    References:

    - https://github.com/rwightman/pytorch-image-models/blob/b7cb8d03/timm/models/efficientnet_blocks.py#L90
    - https://github.com/google-research/google-research/blob/20736344/tunas/rematlib/mobile_model_v3.py#L433
    - https://github.com/ultmaster/AceNAS/blob/46c8895f/searchspace/proxylessnas/utils.py#L100
    """

    def __init__(
        self,
        in_channels: MaybeIntChoice,
        out_channels: MaybeIntChoice,
        kernel_size: MaybeIntChoice = 3,
        stride: int = 1,
        squeeze_excite: Optional[Callable[[MaybeIntChoice, MaybeIntChoice], nn.Module]] = None,
        norm_layer: Optional[Callable[[int], nn.Module]] = None,
        activation_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        blocks = [
            # dw
            ConvBNReLU(in_channels, in_channels, stride=stride, kernel_size=kernel_size, groups=in_channels,
                       norm_layer=norm_layer, activation_layer=activation_layer),
            # optional se
            squeeze_excite(in_channels, in_channels) if squeeze_excite else nn.Identity(),
            # pw-linear
            ConvBNReLU(in_channels, out_channels, kernel_size=1, norm_layer=norm_layer, activation_layer=nn.Identity)
        ]
        super().__init__(*simplify_sequential(blocks))
        # NOTE: "is" is used here instead of "==" to avoid creating a new value choice.
        self.has_skip = stride == 1 and in_channels is out_channels

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        if self.has_skip:
            return x + super().forward(x)
        else:
            return super().forward(x)


[文档] class InvertedResidual(nn.Sequential): """ An Inverted Residual Block, sometimes called an MBConv Block, is a type of residual block used for image models that uses an inverted structure for efficiency reasons. It was originally proposed for the `MobileNetV2 <https://arxiv.org/abs/1801.04381>`__ CNN architecture. It has since been reused for several mobile-optimized CNNs. It follows a narrow -> wide -> narrow approach, hence the inversion. It first widens with a 1x1 convolution, then uses a 3x3 depthwise convolution (which greatly reduces the number of parameters), then a 1x1 convolution is used to reduce the number of channels so input and output can be added. This implementation is sort of a mixture between: - https://github.com/google-research/google-research/blob/20736344/tunas/rematlib/mobile_model_v3.py#L453 - https://github.com/rwightman/pytorch-image-models/blob/b7cb8d03/timm/models/efficientnet_blocks.py#L134 Parameters ---------- in_channels The number of input channels. Can be a value choice. out_channels The number of output channels. Can be a value choice. expand_ratio The ratio of intermediate channels with respect to input channels. Can be a value choice. kernel_size The kernel size of the depthwise convolution. Can be a value choice. stride The stride of the depthwise convolution. squeeze_excite Callable to create squeeze and excitation layer. Take hidden channels and input channels as arguments. norm_layer Callable to create normalization layer. Take input channels as argument. activation_layer Callable to create activation layer. No input arguments. """ def __init__( self, in_channels: MaybeIntChoice, out_channels: MaybeIntChoice, expand_ratio: Union[float, MutableExpression[float]], kernel_size: MaybeIntChoice = 3, stride: int = 1, squeeze_excite: Optional[Callable[[MaybeIntChoice, MaybeIntChoice], nn.Module]] = None, norm_layer: Optional[Callable[[int], nn.Module]] = None, activation_layer: Optional[Callable[..., nn.Module]] = None, ) -> None: super().__init__() self.stride = stride self.out_channels = out_channels assert stride in [1, 2] hidden_ch = cast(int, make_divisible(in_channels * expand_ratio, 8)) # NOTE: this equivalence check (==) does NOT work for ValueChoice, need to use "is" self.has_skip = stride == 1 and in_channels is out_channels layers: List[nn.Module] = [ # point-wise convolution # NOTE: some paper omit this point-wise convolution when stride = 1. # In our implementation, if this pw convolution is intended to be omitted, # please use SepConv instead. ConvBNReLU(in_channels, hidden_ch, kernel_size=1, norm_layer=norm_layer, activation_layer=activation_layer), # depth-wise ConvBNReLU(hidden_ch, hidden_ch, stride=stride, kernel_size=kernel_size, groups=hidden_ch, norm_layer=norm_layer, activation_layer=activation_layer), # SE squeeze_excite( cast(int, hidden_ch), cast(int, in_channels) ) if squeeze_excite is not None else nn.Identity(), # pw-linear ConvBNReLU(hidden_ch, out_channels, kernel_size=1, norm_layer=norm_layer, activation_layer=nn.Identity), ] super().__init__(*simplify_sequential(layers)) def forward(self, x: torch.Tensor) -> torch.Tensor: if self.has_skip: return x + super().forward(x) else: return super().forward(x)
def inverted_residual_choice_builder( expand_ratios: List[int], kernel_sizes: List[int], downsample: bool, stage_input_width: int, stage_output_width: int, label: str ): def builder(index): stride = 1 inp = stage_output_width if index == 0: # first layer in stage # do downsample and width reshape inp = stage_input_width if downsample: stride = 2 oup = stage_output_width op_choices = {} for exp_ratio in expand_ratios: for kernel_size in kernel_sizes: op_choices[f'k{kernel_size}e{exp_ratio}'] = InvertedResidual(inp, oup, exp_ratio, kernel_size, stride) # It can be implemented with ValueChoice, but we use LayerChoice here # to be aligned with the intention of the original ProxylessNAS. return LayerChoice(op_choices, label=f'{label}_i{index}') return builder
[文档] class ProxylessNAS(ModelSpace): """ The search space proposed by `ProxylessNAS <https://arxiv.org/abs/1812.00332>`__. Following the official implementation, the inverted residual with kernel size / expand ratio variations in each layer is implemented with a :class:`~nni.retiarii.nn.pytorch.LayerChoice` with all-combination candidates. That means, when used in weight sharing, these candidates will be treated as separate layers, and won't be fine-grained shared. We note that :class:`MobileNetV3Space` is different in this perspective. This space can be implemented as part of :class:`MobileNetV3Space`, but we separate those following conventions. Parameters ---------- num_labels The number of labels for classification. base_widths Widths of each stage, from stem, to body, to head. Length should be 9. dropout_rate Dropout rate for the final classification layer. width_mult Width multiplier for the model. bn_eps Epsilon for batch normalization. bn_momentum Momentum for batch normalization. """ def __init__(self, num_labels: int = 1000, base_widths: Tuple[int, ...] = (32, 16, 32, 40, 80, 96, 192, 320, 1280), dropout_rate: float = 0., width_mult: float = 1.0, bn_eps: float = 1e-3, bn_momentum: float = 0.1): super().__init__() assert len(base_widths) == 9 # include the last stage info widths here widths = [make_divisible(width * width_mult, 8) for width in base_widths] downsamples = [True, False, True, True, True, False, True, False] self.num_labels = num_labels self.dropout_rate = dropout_rate self.bn_eps = bn_eps self.bn_momentum = bn_momentum self.stem = ConvBNReLU(3, widths[0], stride=2, norm_layer=MutableBatchNorm2d) blocks: List[nn.Module] = [ # first stage is fixed DepthwiseSeparableConv(widths[0], widths[1], kernel_size=3, stride=1) ] # https://github.com/ultmaster/AceNAS/blob/46c8895fd8a05ffbc61a6b44f1e813f64b4f66b7/searchspace/proxylessnas/__init__.py#L21 for stage in range(2, 8): # Rather than returning a fixed module here, # we return a builder that dynamically creates module for different `repeat_idx`. builder = inverted_residual_choice_builder( [3, 6], [3, 5, 7], downsamples[stage], widths[stage - 1], widths[stage], f's{stage}') if stage < 7: blocks.append(Repeat(builder, (1, 4), label=f's{stage}_depth')) else: # No mutation for depth in the last stage. # Directly call builder to initiate one block blocks.append(builder(0)) self.blocks = nn.Sequential(*blocks) # final layers self.feature_mix_layer = ConvBNReLU(widths[7], widths[8], kernel_size=1, norm_layer=MutableBatchNorm2d) self.global_avg_pooling = nn.AdaptiveAvgPool2d(1) self.dropout_layer = nn.Dropout(dropout_rate) self.classifier = MutableLinear(widths[-1], num_labels) reset_parameters(self, bn_momentum=bn_momentum, bn_eps=bn_eps) def forward(self, x): x = self.stem(x) x = self.blocks(x) x = self.feature_mix_layer(x) x = self.global_avg_pooling(x) x = x.view(x.size(0), -1) # flatten x = self.dropout_layer(x) x = self.classifier(x) return x def no_weight_decay(self): # this is useful for timm optimizer # no regularizer to linear layer if hasattr(self, 'classifier'): return {'classifier.weight', 'classifier.bias'} return set() @classmethod def load_searched_model( cls, name: str, pretrained: bool = False, download: bool = False, progress: bool = True ) -> nn.Module: init_kwargs = {} # all default if name == 'acenas-m1': arch = { 's2_depth': 2, 's2_i0': 'k3e6', 's2_i1': 'k3e3', 's3_depth': 3, 's3_i0': 'k5e3', 's3_i1': 'k3e3', 's3_i2': 'k5e3', 's4_depth': 2, 's4_i0': 'k3e6', 's4_i1': 'k5e3', 's5_depth': 4, 's5_i0': 'k7e6', 's5_i1': 'k3e6', 's5_i2': 'k3e6', 's5_i3': 'k7e3', 's6_depth': 4, 's6_i0': 'k7e6', 's6_i1': 'k7e6', 's6_i2': 'k7e3', 's6_i3': 'k7e3', 's7_depth': 1, 's7_i0': 'k7e6' } elif name == 'acenas-m2': arch = { 's2_depth': 1, 's2_i0': 'k5e3', 's3_depth': 3, 's3_i0': 'k3e6', 's3_i1': 'k3e3', 's3_i2': 'k5e3', 's4_depth': 2, 's4_i0': 'k7e6', 's4_i1': 'k5e6', 's5_depth': 4, 's5_i0': 'k5e6', 's5_i1': 'k5e3', 's5_i2': 'k5e6', 's5_i3': 'k3e6', 's6_depth': 4, 's6_i0': 'k7e6', 's6_i1': 'k5e6', 's6_i2': 'k5e3', 's6_i3': 'k5e6', 's7_depth': 1, 's7_i0': 'k7e6' } elif name == 'acenas-m3': arch = { 's2_depth': 2, 's2_i0': 'k3e3', 's2_i1': 'k3e6', 's3_depth': 2, 's3_i0': 'k5e3', 's3_i1': 'k3e3', 's4_depth': 3, 's4_i0': 'k5e6', 's4_i1': 'k7e6', 's4_i2': 'k3e6', 's5_depth': 4, 's5_i0': 'k7e6', 's5_i1': 'k7e3', 's5_i2': 'k7e3', 's5_i3': 'k5e3', 's6_depth': 4, 's6_i0': 'k7e6', 's6_i1': 'k7e3', 's6_i2': 'k7e6', 's6_i3': 'k3e3', 's7_depth': 1, 's7_i0': 'k5e6' } elif name == 'proxyless-cpu': arch = { 's2_depth': 4, 's2_i0': 'k3e6', 's2_i1': 'k3e3', 's2_i2': 'k3e3', 's2_i3': 'k3e3', 's3_depth': 4, 's3_i0': 'k3e6', 's3_i1': 'k3e3', 's3_i2': 'k3e3', 's3_i3': 'k5e3', 's4_depth': 2, 's4_i0': 'k3e6', 's4_i1': 'k3e3', 's5_depth': 4, 's5_i0': 'k5e6', 's5_i1': 'k3e3', 's5_i2': 'k3e3', 's5_i3': 'k3e3', 's6_depth': 4, 's6_i0': 'k5e6', 's6_i1': 'k5e3', 's6_i2': 'k5e3', 's6_i3': 'k3e3', 's7_depth': 1, 's7_i0': 'k5e6' } init_kwargs['base_widths'] = [40, 24, 32, 48, 88, 104, 216, 360, 1432] elif name == 'proxyless-gpu': arch = { 's2_depth': 1, 's2_i0': 'k5e3', 's3_depth': 2, 's3_i0': 'k7e3', 's3_i1': 'k3e3', 's4_depth': 2, 's4_i0': 'k7e6', 's4_i1': 'k5e3', 's5_depth': 3, 's5_i0': 'k5e6', 's5_i1': 'k3e3', 's5_i2': 'k5e3', 's6_depth': 4, 's6_i0': 'k7e6', 's6_i1': 'k7e6', 's6_i2': 'k7e6', 's6_i3': 'k5e6', 's7_depth': 1, 's7_i0': 'k7e6' } init_kwargs['base_widths'] = [40, 24, 32, 56, 112, 128, 256, 432, 1728] elif name == 'proxyless-mobile': arch = { 's2_depth': 2, 's2_i0': 'k5e3', 's2_i1': 'k3e3', 's3_depth': 4, 's3_i0': 'k7e3', 's3_i1': 'k3e3', 's3_i2': 'k5e3', 's3_i3': 'k5e3', 's4_depth': 4, 's4_i0': 'k7e6', 's4_i1': 'k5e3', 's4_i2': 'k5e3', 's4_i3': 'k5e3', 's5_depth': 4, 's5_i0': 'k5e6', 's5_i1': 'k5e3', 's5_i2': 'k5e3', 's5_i3': 'k5e3', 's6_depth': 4, 's6_i0': 'k7e6', 's6_i1': 'k7e6', 's6_i2': 'k7e3', 's6_i3': 'k7e3', 's7_depth': 1, 's7_i0': 'k7e6' } else: raise ValueError(f'Unsupported architecture with name: {name}') model_factory = cls.frozen_factory(arch) model = model_factory(**init_kwargs) if pretrained: weight_file = load_pretrained_weight(name, download=download, progress=progress) pretrained_weights = torch.load(weight_file) model.load_state_dict(pretrained_weights) return model
def reset_parameters(model, model_init='he_fout', init_div_groups=False, bn_momentum=0.1, bn_eps=1e-5): for m in model.modules(): if isinstance(m, nn.Conv2d): if model_init == 'he_fout': n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels if init_div_groups: n /= m.groups m.weight.data.normal_(0, math.sqrt(2. / n)) elif model_init == 'he_fin': n = m.kernel_size[0] * m.kernel_size[1] * m.in_channels if init_div_groups: n /= m.groups m.weight.data.normal_(0, math.sqrt(2. / n)) else: raise NotImplementedError elif isinstance(m, nn.BatchNorm2d): m.weight.data.fill_(1) m.bias.data.zero_() m.momentum = bn_momentum m.eps = bn_eps elif isinstance(m, nn.Linear): m.weight.data.normal_(0, 0.01) if m.bias is not None: m.bias.data.zero_() elif isinstance(m, nn.BatchNorm1d): m.weight.data.fill_(1) m.bias.data.zero_()