Source code for pytext.task.accelerator_lowering
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
from typing import Iterable, List, Tuple
import torch
accelerator_lowering_supported = True
try:
from accelerators.pytorch.lib.glow_decorator import accelerator
except ImportError:
accelerator_lowering_supported = False
from .nop_decorator import accelerator
print("Accelerator Lowering not supported!")
from pytext.config import ExportConfig
from pytext.models.representations.bilstm import BiLSTM
from pytext.models.roberta import RoBERTaEncoder
from pytext.utils.usage import log_accelerator_feature_usage
from torch import nn
[docs]def accelerator_transformerLayers_inputs(
model: nn.Module,
trace: torch.jit.ScriptFunction,
export_options: ExportConfig,
dataset_iterable: Iterable,
module_path,
):
import torch_glow
# we use the padding control from the Export Config:
if export_options is None:
export_options = ExportConfig()
if export_options.seq_padding_control is None:
raise RuntimeError("seq padding control not specified")
if export_options.batch_padding_control is None:
raise RuntimeError("batch padding control not specified")
batch_padding_control = export_options.batch_padding_control
# Restrict seq_padding_control to valid ranges
seq_padding_control = []
max_seq_len = trace.get_max_seq_len()
for pad in export_options.seq_padding_control:
if pad < max_seq_len:
seq_padding_control.append(pad)
seq_padding_control.append(max_seq_len)
# this should use a method, or module_path, instead of being hardcoded
# embedding_dim = model.encoder.encoder.transformer.token_embedding.embedding_dim
embedding_dim = accelerator.get_embedding_module_from_path(model, module_path)
input_examples = []
for seq_len in seq_padding_control:
if seq_len <= 0:
continue
for batch_size in batch_padding_control:
if batch_size <= 0:
continue
# Todo: We directly generate data input instead of using dataset_iterable, enhance later
input1 = torch.randn(
[seq_len, batch_size, embedding_dim], dtype=torch.float32
)
input2 = torch.randn([batch_size, seq_len]).bool()
input_specs = torch_glow.input_specs_from_tensors([input1, input2])
input_examples.append(input_specs)
return input_examples
[docs]class AcceleratorTransformerLayersInternal(nn.Module):
def __init__(self, layers):
super().__init__()
self.layers = layers
[docs] def forward(
self, encoded: torch.Tensor, padding_mask: torch.Tensor
) -> List[torch.Tensor]:
states = [encoded]
for layer in self.layers:
encoded = layer(encoded, padding_mask)
states.append(encoded)
return states
# accelerator imported from .nop_decorator to avoid ImportError when glow_decorator is not available
@accelerator(
[
(
"NNPI",
{
"NNPI_IceCores": "12",
"NNPINumParallelChunks": "12",
"NNPIUseGeluLUT": "true",
},
)
],
inputs_function=accelerator_transformerLayers_inputs,
)
class AcceleratorTransformerLayers(nn.Module):
def __init__(self, layers):
super().__init__()
self.layers = AcceleratorTransformerLayersInternal(layers)
def forward(
self, encoded: torch.Tensor, padding_mask: torch.Tensor
) -> List[torch.Tensor]:
return self.layers(encoded, padding_mask)
# Special reimplementation of transformer which separates the
# layers into a separate module for easy lowering to accelerator
[docs]class AcceleratorTransformer(nn.Module):
def __init__(self, transformer):
super().__init__()
self.padding_idx = transformer.padding_idx
self.token_embedding = transformer.token_embedding
self.layers = AcceleratorTransformerLayers(transformer.layers)
self.positional_embedding = transformer.positional_embedding
self.embedding_layer_norm = transformer.embedding_layer_norm
self.dropout = transformer.dropout
[docs] def forward(self, tokens: torch.Tensor) -> List[torch.Tensor]:
# compute padding mask. This is needed for multi-head attention
padding_mask = tokens.eq(self.padding_idx)
embedded = self.token_embedding(tokens)
embedded_positions = self.positional_embedding(tokens)
normed = self.embedding_layer_norm(embedded + embedded_positions)
normed = self.dropout(normed)
# account for padding while computing the representation
padded_normed = normed * (1 - padding_mask.unsqueeze(-1).type_as(normed))
# B x T x C -> T x B x C
encoded = padded_normed.transpose(0, 1)
states = self.layers(encoded, padding_mask)
return states
[docs]def accelerator_lstm_inputs(
model: nn.Module,
trace: torch.jit.ScriptFunction,
export_options: ExportConfig,
dataset_iterable: Iterable,
module_path,
):
import torch_glow
# we use the padding control from the Export Config:
if export_options is None:
export_options = ExportConfig()
if export_options.seq_padding_control is None:
raise RuntimeError("seq padding control not specified")
if export_options.batch_padding_control is None:
raise RuntimeError("batch padding control not specified")
batch_padding_control = export_options.batch_padding_control
seq_padding_control = export_options.seq_padding_control
embedding_dim = trace.embedding.word_embedding.embedding_dim * 2
lstm_num_layers = trace.lstm_num_layers
lstm_dim = trace.lstm_dim
input_examples = []
for seq_len in seq_padding_control:
if seq_len <= 0:
continue
for batch_size in batch_padding_control:
if batch_size <= 0:
continue
# Todo: We directly generate data input instead of using dataset_iterable, enhance later
input_embedding = torch.randn(
[batch_size, seq_len, embedding_dim], dtype=torch.float32
)
input_hidden = torch.randn(
[batch_size, lstm_num_layers, lstm_dim], dtype=torch.float32
)
input_cell = torch.randn(
[batch_size, lstm_num_layers, lstm_dim], dtype=torch.float32
)
input_specs = torch_glow.input_specs_from_tensors(
[input_embedding, input_hidden, input_cell]
)
input_examples.append(input_specs)
return input_examples
@accelerator(
[("NNPI", {"NNPI_IceCores": "1", "NNPINumParallelChunks": "12"})],
inputs_function=accelerator_lstm_inputs,
)
class AcceleratorLSTMLayers(nn.Module):
def __init__(self, lstm):
super().__init__()
self.lstm = lstm
self.num_layers = lstm.num_layers
self.hidden_size = lstm.hidden_size
self.lstm.batch_first = False # NNPI only support batch_first = false
def forward(
self, lstm_input: torch.Tensor, hidden: torch.Tensor, cell: torch.Tensor
):
lstm_input = lstm_input.transpose(0, 1)
hidden = hidden.transpose(0, 1)
cell = cell.transpose(0, 1)
rep, new_state = self.lstm(lstm_input, (hidden, cell))
return rep, new_state[0], new_state[1]
[docs]class AcceleratorBiLSTM(nn.Module):
def __init__(self, biLSTM):
super().__init__()
self.dropout = biLSTM.dropout
self.pack_sequence = biLSTM.pack_sequence
self.disable_sort_in_jit = biLSTM.disable_sort_in_jit
self.lstm = AcceleratorLSTMLayers(biLSTM.lstm)
self.representation_dim = biLSTM.representation_dim
self.padding_value = biLSTM.padding_value
[docs] def forward(
self,
embedded_tokens: torch.Tensor,
seq_lengths: torch.Tensor,
states: Tuple[torch.Tensor, torch.Tensor],
) -> Tuple[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]:
rep, new_hidden, new_cell = self.lstm(embedded_tokens, states[0], states[1])
new_hidden = new_hidden.reshape(
self.lstm.num_layers, rep.size(1), self.lstm.hidden_size
).transpose(0, 1)
new_cell = new_cell.reshape(
self.lstm.num_layers, rep.size(1), self.lstm.hidden_size
).transpose(0, 1)
rep = rep.transpose(0, 1)
return rep, (new_hidden, new_cell)
# Swap a transformer for only RoBERTaEncoder encoders
[docs]def swap_modules_for_accelerator(model):
if hasattr(model, "encoder") and isinstance(model.encoder, RoBERTaEncoder):
old_transformer = model.encoder.encoder.transformer
model.encoder.encoder.transformer = AcceleratorTransformer(old_transformer)
return model
elif hasattr(model, "representation") and isinstance(model.representation, BiLSTM):
old_biLSTM = model.representation
model.representation = AcceleratorBiLSTM(old_biLSTM)
return model
else:
return model
[docs]def lower_modules_to_accelerator(
model: nn.Module, trace, export_options: ExportConfig, throughput_optimize=False
):
# Raise error if accelerator could not be imported
if not accelerator_lowering_supported:
raise RuntimeError("Accelerator Lowering not supported!")
import torch_glow
log_accelerator_feature_usage("build.NNPI")
if (
(hasattr(model, "encoder") and isinstance(model.encoder, RoBERTaEncoder))
or (
hasattr(model, "representation")
and isinstance(model.representation, AcceleratorBiLSTM)
)
or (
hasattr(model, "lower_module")
# Internal CNN LM module to add accelerator support.
and type(model.lower_module).__qualname__ == "CNNLowerModule"
)
):
backend = "NNPI"
(
submod_modelpath,
compilation_spec_dict,
inputs_function,
) = accelerator.get_modules(model, backend)[0]
submod_tracepath = accelerator.model2trace_path(submod_modelpath)
spec = torch_glow.CompilationSpec()
spec.get_settings().set_glow_backend(backend)
compilation_group = torch_glow.CompilationGroup()
spec.compilation_groups_append(compilation_group)
compilation_group_settings = compilation_group.get_settings()
compilation_group_settings.set_convert_to_fp16(True)
# Override the options for throughput-optimized case
if throughput_optimize:
compilation_spec_dict["NNPI_IceCores"] = "4"
compilation_spec_dict["NNPINumParallelChunks"] = "4"
compilation_group_settings.set_replication_count(3)
for k, v in compilation_spec_dict.items():
compilation_group.get_settings().backend_specific_opts_insert(k, v)
if inputs_function is not None:
input_sets = inputs_function(
model, trace, export_options, None, submod_modelpath
)
else:
raise RuntimeError(
"inputs_function needs to be specified in accelerator decorator"
)
compilation_group.set_input_sets(input_sets)
trace = torch_glow.to_glow_selective(
trace,
{submod_tracepath: spec},
inplace=False,
)
return trace
else:
return trace