#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved

import contextlib
import copy
import json
import sys
from itertools import chain
from typing import List, Optional, Tuple

import torch
from pytext.common import Padding, constants
from pytext.config.component import Component, ComponentType, create_component
from import (
from import Gazetteer
from import Token, Tokenizer
from pytext.torchscript.tensorizer import (
from pytext.torchscript.tokenizer import ScriptDoNothingTokenizer
from pytext.torchscript.utils import ScriptBatchInput, pad_3d, validate_padding_control
from pytext.torchscript.vocab import ScriptVocabulary
from pytext.utils import cuda, precision
from import Slot
from pytext.utils.file_io import PathManager
from pytext.utils.lazy import lazy_property
from pytext.utils.precision import maybe_half
from pytext.utils.usage import log_class_usage

from .utils import (

[docs]@contextlib.contextmanager def to_device(tensorizer_script_impl, device): cur_device = tensorizer_script_impl.device tensorizer_script_impl.device = device yield tensorizer_script_impl.device = cur_device
[docs]def tokenize( text: str = None, pre_tokenized: List[Token] = None, tokenizer: Tokenizer = None, bos_token: Optional[str] = None, eos_token: Optional[str] = None, pad_token: str = PAD, use_eos_token_for_bos: bool = False, max_seq_len: int = 2 ** 30, ): tokenized = ( pre_tokenized or tokenizer.tokenize(text)[ : max_seq_len - (bos_token is not None) - (eos_token is not None) ] ) if bos_token: if use_eos_token_for_bos: bos_token = eos_token tokenized = [Token(bos_token, -1, -1)] + tokenized if eos_token: tokenized.append(Token(eos_token, -1, -1)) if not tokenized: tokenized = [Token(pad_token, -1, -1)] tokenized_texts, start_idx, end_idx = zip( *((t.value, t.start, t.end) for t in tokenized) ) return tokenized_texts, start_idx, end_idx
[docs]def lookup_tokens( text: str = None, pre_tokenized: List[Token] = None, tokenizer: Tokenizer = None, vocab: Vocabulary = None, bos_token: Optional[str] = None, eos_token: Optional[str] = None, pad_token: str = PAD, use_eos_token_for_bos: bool = False, max_seq_len: int = 2 ** 30, ): tokenized_texts, start_idx, end_idx = tokenize( text, pre_tokenized, tokenizer, bos_token, eos_token, pad_token, use_eos_token_for_bos, max_seq_len, ) tokens = vocab.lookup_all(tokenized_texts) return tokens, start_idx, end_idx
[docs]class TensorizerScriptImpl(torch.nn.Module): device: str seq_padding_control: Optional[List[int]] batch_padding_control: Optional[List[int]] def __init__(self): super().__init__() self.device: str = "" # padding_control options: # None - no padding # [0, pad1, pad2, pad3,...] - pads sequence/batch length to smallest padX larger than sequence self.seq_padding_control = None self.batch_padding_control = None
[docs] @torch.jit.export def set_device(self, device: str): self.device = device
[docs] @torch.jit.export def set_padding_control(self, dimension: str, padding_control: Optional[List[int]]): """ This functions will be called to set a padding style. None - No padding List: first element 0, round seq length to the smallest list element larger than inputs """ if not validate_padding_control(padding_control): raise RuntimeError("Malformed padding_control value") if dimension == "sequence_length": self.seq_padding_control = padding_control elif dimension == "batch_length": self.batch_padding_control = padding_control else: raise RuntimeError("Illegal padding dimension specified.")
[docs] def batch_size(self, inputs: ScriptBatchInput) -> int: texts: Optional[List[List[str]]] = inputs.texts tokens: Optional[List[List[List[str]]]] = inputs.tokens if texts is not None: return len(texts) elif tokens is not None: return len(tokens) else: raise RuntimeError("Empty input for both texts and tokens.")
[docs] def row_size(self, inputs: ScriptBatchInput) -> int: texts: Optional[List[List[str]]] = inputs.texts tokens: Optional[List[List[List[str]]]] = inputs.tokens if texts is not None: return len(texts[0]) elif tokens is not None: return len(tokens[0]) else: raise RuntimeError("Empty input for both texts and tokens.")
[docs] def get_texts_by_index( self, texts: Optional[List[List[str]]], index: int ) -> Optional[List[str]]: if texts is None or len(texts) == 0: return None return texts[index]
[docs] def get_tokens_by_index( self, tokens: Optional[List[List[List[str]]]], index: int ) -> Optional[List[List[str]]]: if tokens is None or len(tokens) == 0: return None return tokens[index]
[docs] def tokenize(self, *args, **kwargs): """ This functions will receive the inputs from Clients, usually there are two possible inputs 1) a row of texts: List[str] 2) a row of pre-processed tokens: List[List[str]] Override this function to be TorchScriptable, e.g you need to declare concrete input arguments with type hints. """ raise NotImplementedError
[docs] def numberize(self, *args, **kwargs): """ This functions will receive the outputs from function: tokenize() or will be called directly from PyTextTensorizer function: numberize(). Override this function to be TorchScriptable, e.g you need to declare concrete input arguments with type hints. """ raise NotImplementedError
[docs] def tensorize(self, *args, **kwargs): """ This functions will receive a list(e.g a batch) of outputs from function numberize(), padding and convert to output tensors. Override this function to be TorchScriptable, e.g you need to declare concrete input arguments with type hints. """ raise NotImplementedError
[docs] @torch.jit.ignore def tensorize_wrapper(self, *args, **kwargs): """ This functions will receive a list(e.g a batch) of outputs from function numberize(), padding and convert to output tensors. It will be called in PyText Tensorizer during training time, this function is not torchscriptiable because it depends on cuda.device(). """ with to_device(self, cuda.device()): return self.tensorize(*args, **kwargs)
[docs] @torch.jit.ignore def torchscriptify(self): return torch.jit.script(self)
[docs]class Tensorizer(Component): """Tensorizers are a component that converts from batches of `` instances to tensors. These tensors will eventually be inputs to the model, but the model is aware of the tensorizers and can arrange the tensors they create to conform to its model. Tensorizers have an initialize function. This function allows the tensorizer to read through the training dataset to build up any data that it needs for creating the model. Commonly this is valuable for things like inferring a vocabulary from the training set, or learning the entire set of training labels, or slot labels, etc. """ __COMPONENT_TYPE__ = ComponentType.TENSORIZER __EXPANSIBLE__ = True __TENSORIZER_SCRIPT_IMPL__ = None
[docs] class Config(Component.Config): # Indicate if it can be used to generate input Tensors for prediction is_input: bool = True
[docs] @classmethod def from_config(cls, config: Config): return cls(config.is_input)
def __init__(self, is_input: bool = True): self.is_input = is_input log_class_usage(__class__) @property def column_schema(self): """Generic types don't pickle well pre-3.7, so we don't actually want to store the schema as an attribute. We're already storing all of the columns anyway, so until there's a better solution, schema is a property.""" return []
[docs] def numberize(self, row): raise NotImplementedError
[docs] def prepare_input(self, row): """ Return preprocessed input tensors/blob for caffe2 prediction net.""" return self.numberize(row)
[docs] def sort_key(self, row): raise NotImplementedError
[docs] def tensorize(self, batch): """Tensorizer knows how to pad and tensorize a batch of it's own output.""" return batch
[docs] def initialize(self, from_scratch=True): """ The initialize function is carefully designed to allow us to read through the training dataset only once, and not store it in memory. As such, it can't itself manually iterate over the data source. Instead, the initialize function is a coroutine, which is sent row data. This should look roughly like:: # set up variables here ... try: # start reading through data source while True: # row has type Dict[str, types.DataType] row = yield # update any variables, vocabularies, etc. ... except GeneratorExit: # finalize your initialization, set instance variables, etc. ... See `WordTokenizer.initialize` for a more concrete example. """ return # we need yield here to make this function a generator yield
@lazy_property def tensorizer_script_impl(self): # Script tensorizer is unpickleable, we use lazy_property for # lazy initialization to construct the object during run time. raise NotImplementedError def __getstate__(self): # make a shallow copy of state to avoid side effect on the original object state = copy.copy(vars(self)) state.pop("tensorizer_script_impl", None) return state
[docs] def stringify(self, token_indices): # Used in metric reporter to convert from tokens to string res = "" if hasattr(self, "vocab"): res = " ".join([self.vocab._vocab[index] for index in token_indices]) if hasattr(self, "tokenizer"): if hasattr(self.tokenizer, "decode"): res = self.tokenizer.decode(res) return res
[docs] def torchscriptify(self): return self.tensorizer_script_impl.torchscriptify()
[docs]class VocabFileConfig(Component.Config): #: File containing tokens to add to vocab (first whitespace-separated entry per #: line) filepath: str = "" #: Whether to skip the first line of the file (e.g. if it is a header line) skip_header_line: bool = False #: Whether to lowercase each of the tokens in the file lowercase_tokens: bool = False #: The max number of tokens to add to vocab size_limit: int = 0
[docs]class VocabConfig(Component.Config): #: Whether to add tokens from training data to vocab. build_from_data: bool = True #: Add `size_from_data` most frequent tokens in training data to vocab (if this #: is 0, add all tokens from training data). size_from_data: int = 0 #: Add `min_counts` filter out tokens in training data that with count smaller #: than min_counts. min_counts: int = 0 vocab_files: List[VocabFileConfig] = []
[docs]class TokenTensorizer(Tensorizer): """Convert text to a list of tokens. Do this based on a tokenizer configuration, and build a vocabulary for numberization. Finally, pad the batch to create a square tensor of the correct size. """
[docs] class Config(Tensorizer.Config): #: The name of the text column to parse from the data source. column: str = "text" #: The tokenizer to use to split input text into tokens. tokenizer: Tokenizer.Config = Tokenizer.Config() add_bos_token: bool = False add_eos_token: bool = False use_eos_token_for_bos: bool = False max_seq_len: Optional[int] = None vocab: VocabConfig = VocabConfig() vocab_file_delimiter: str = " "
[docs] @classmethod def from_config(cls, config: Config): tokenizer = create_component(ComponentType.TOKENIZER, config.tokenizer) return cls( text_column=config.column, tokenizer=tokenizer, add_bos_token=config.add_bos_token, add_eos_token=config.add_eos_token, use_eos_token_for_bos=config.use_eos_token_for_bos, max_seq_len=config.max_seq_len, vocab_config=config.vocab, vocab_file_delimiter=config.vocab_file_delimiter, is_input=config.is_input, )
def __init__( self, text_column, tokenizer=None, add_bos_token=Config.add_bos_token, add_eos_token=Config.add_eos_token, use_eos_token_for_bos=Config.use_eos_token_for_bos, max_seq_len=Config.max_seq_len, vocab_config=None, vocab=None, vocab_file_delimiter=" ", is_input=Config.is_input, ): self.text_column = text_column self.tokenizer = tokenizer or Tokenizer() self.vocab = vocab self.add_bos_token = add_bos_token self.add_eos_token = add_eos_token self.use_eos_token_for_bos = use_eos_token_for_bos self.max_seq_len = max_seq_len or 2 ** 30 # large number self.vocab_builder = None self.vocab_config = vocab_config or VocabConfig() self.vocab_file_delimiter = vocab_file_delimiter super().__init__(is_input) @property def column_schema(self): return [(self.text_column, str)] def _tokenize(self, text=None, pre_tokenized=None): return tokenize( text=text, pre_tokenized=pre_tokenized, tokenizer=self.tokenizer, bos_token=self.vocab.bos_token if self.add_bos_token else None, eos_token=self.vocab.eos_token if self.add_eos_token else None, pad_token=self.vocab.pad_token, use_eos_token_for_bos=self.use_eos_token_for_bos, max_seq_len=self.max_seq_len, ) def _lookup_tokens(self, text=None, pre_tokenized=None): return lookup_tokens( text=text, pre_tokenized=pre_tokenized, tokenizer=self.tokenizer, vocab=self.vocab, bos_token=self.vocab.bos_token if self.add_bos_token else None, eos_token=self.vocab.eos_token if self.add_eos_token else None, pad_token=self.vocab.pad_token, use_eos_token_for_bos=self.use_eos_token_for_bos, max_seq_len=self.max_seq_len, ) def _reverse_lookup(self, token_ids): return [self.vocab[id] for id in token_ids]
[docs] def initialize(self, vocab_builder=None, from_scratch=True): """Build vocabulary based on training corpus.""" if self.vocab and from_scratch: if self.vocab_config.build_from_data or self.vocab_config.vocab_files: print( f"`{self.text_column}` column: vocab already provided, skipping " f"adding tokens from data and from vocab files." ) return if not self.vocab_config.build_from_data and not self.vocab_config.vocab_files: raise ValueError( f"To create token tensorizer for '{self.text_column}', either " f"`build_from_data` or `vocab_files` must be set." ) if not self.vocab_builder: # else means not initialize from scratch, self.vocab_builder # would be set already self.vocab_builder = vocab_builder or VocabBuilder( delimiter=self.vocab_file_delimiter ) self.vocab_builder.use_bos = self.add_bos_token self.vocab_builder.use_eos = self.add_eos_token if not self.vocab_config.build_from_data: self._add_vocab_from_files() self.vocab = self.vocab_builder.make_vocab() return try: while True: row = yield raw_text = row[self.text_column] tokenized = self.tokenizer.tokenize(raw_text) self.vocab_builder.add_all([t.value for t in tokenized]) except GeneratorExit: self.vocab_builder.truncate_to_vocab_size( self.vocab_config.size_from_data, self.vocab_config.min_counts ) self._add_vocab_from_files() self.vocab = self.vocab_builder.make_vocab()
def _add_vocab_from_files(self): for vocab_file in self.vocab_config.vocab_files: with as f: self.vocab_builder.add_from_file( f, vocab_file.skip_header_line, vocab_file.lowercase_tokens, vocab_file.size_limit, )
[docs] def numberize(self, row): """Tokenize, look up in vocabulary.""" tokens, start_idx, end_idx = self._lookup_tokens(row[self.text_column]) token_ranges = list(zip(start_idx, end_idx)) return tokens, len(tokens), token_ranges
[docs] def prepare_input(self, row): """Tokenize, look up in vocabulary, return tokenized_texts in raw text""" tokenized_texts, start_idx, end_idx = self._tokenize(row[self.text_column]) token_ranges = list(zip(start_idx, end_idx)) return list(tokenized_texts), len(tokenized_texts), token_ranges
[docs] def tensorize(self, batch): tokens, seq_lens, token_ranges = zip(*batch) return ( pad_and_tensorize(tokens, self.vocab.get_pad_index()), pad_and_tensorize(seq_lens), pad_and_tensorize(token_ranges), )
[docs] def sort_key(self, row): # use seq_len as sort key return row[1]
[docs]class ByteTensorizer(Tensorizer): """Turn characters into sequence of int8 bytes. One character will have one or more bytes depending on it's encoding """ UNK_BYTE = 0 PAD_BYTE = 0 NUM = 256
[docs] class Config(Tensorizer.Config): #: The name of the text column to parse from the data source. column: str = "text" lower: bool = True max_seq_len: Optional[int] = None add_bos_token: Optional[bool] = False add_eos_token: Optional[bool] = False use_eos_token_for_bos: Optional[bool] = False
[docs] @classmethod def from_config(cls, config: Config): return cls( config.column, config.lower, config.max_seq_len, config.add_bos_token, config.add_eos_token, config.use_eos_token_for_bos, config.is_input, )
def __init__( self, text_column, lower=True, max_seq_len=None, add_bos_token=Config.add_bos_token, add_eos_token=Config.add_eos_token, use_eos_token_for_bos=Config.use_eos_token_for_bos, is_input=Config.is_input, ): self.text_column = text_column self.lower = lower self.max_seq_len = max_seq_len self.add_bos_token = add_bos_token self.add_eos_token = add_eos_token self.use_eos_token_for_bos = use_eos_token_for_bos super().__init__(is_input) @property def column_schema(self): return [(self.text_column, str)]
[docs] def numberize(self, row): """Convert text to characters.""" text = row[self.text_column].strip() if self.lower: text = text.lower() bytes = list(text.encode()) if self.max_seq_len: bytes = bytes[: self.max_seq_len] if self.add_bos_token: bos = BYTE_EOS if self.use_eos_token_for_bos else BYTE_BOS if bos in text: print('Special token "{}" exists in text "{}". Exit.'.format(bos, text)) sys.exit(1) bytes = list(bos.encode()) + bytes if self.add_eos_token: if BYTE_EOS in text: print( 'Special token "{}" exists in text "{}". Exit.'.format( BYTE_EOS, text ) ) sys.exit(1) bytes = bytes + list(BYTE_EOS.encode()) return bytes, len(bytes)
[docs] def tensorize(self, batch): bytes, bytes_len = zip(*batch) return pad_and_tensorize(bytes, self.PAD_BYTE), pad_and_tensorize(bytes_len)
[docs] def sort_key(self, row): # use bytes_len as sort key return row[1]
[docs]class ByteTokenTensorizer(Tensorizer): """Turn words into 2-dimensional tensors of int8 bytes. Words are padded to `max_byte_len`. Also computes sequence lengths (1-D tensor) and token lengths (2-D tensor). 0 is the pad byte. """ NUM_BYTES = 256
[docs] class Config(Tensorizer.Config): #: The name of the text column to parse from the data source. column: str = "text" #: The tokenizer to use to split input text into tokens. tokenizer: Tokenizer.Config = Tokenizer.Config() #: The max token length for input text. max_seq_len: Optional[int] = None #: The max byte length for a token. max_byte_len: int = 15 #: Offset to add to all non-padding bytes offset_for_non_padding: int = 0 add_bos_token: bool = False add_eos_token: bool = False use_eos_token_for_bos: bool = False
[docs] @classmethod def from_config(cls, config: Config): tokenizer = create_component(ComponentType.TOKENIZER, config.tokenizer) return cls( text_column=config.column, tokenizer=tokenizer, max_seq_len=config.max_seq_len, max_byte_len=config.max_byte_len, offset_for_non_padding=config.offset_for_non_padding, add_bos_token=config.add_bos_token, add_eos_token=config.add_eos_token, use_eos_token_for_bos=config.use_eos_token_for_bos, is_input=config.is_input, )
def __init__( self, text_column, tokenizer=None, max_seq_len=Config.max_seq_len, max_byte_len=Config.max_byte_len, offset_for_non_padding=Config.offset_for_non_padding, add_bos_token=Config.add_bos_token, add_eos_token=Config.add_eos_token, use_eos_token_for_bos=Config.use_eos_token_for_bos, is_input=Config.is_input, ): self.text_column = text_column self.tokenizer = tokenizer or Tokenizer() self.max_seq_len = max_seq_len or 2 ** 30 # large number self.max_byte_len = max_byte_len self.offset_for_non_padding = offset_for_non_padding self.add_bos_token = add_bos_token self.add_eos_token = add_eos_token self.use_eos_token_for_bos = use_eos_token_for_bos super().__init__(is_input) @property def column_schema(self): return [(self.text_column, str)]
[docs] def numberize(self, row): """Convert text to bytes, pad batch.""" tokens = self.tokenizer.tokenize(row[self.text_column])[ : (self.max_seq_len - self.add_bos_token - self.add_eos_token) ] if self.add_bos_token: bos = EOS if self.use_eos_token_for_bos else BOS tokens = [Token(bos, -1, -1)] + tokens if self.add_eos_token: tokens.append(Token(EOS, -1, -1)) if not tokens: tokens = [Token(PAD, -1, -1)] bytes = [self._numberize_token(token)[: self.max_byte_len] for token in tokens] token_lengths = len(tokens) byte_lengths = [len(token_bytes) for token_bytes in bytes] return bytes, token_lengths, byte_lengths
def _numberize_token(self, token): return [c + self.offset_for_non_padding for c in token.value.encode()]
[docs] def tensorize(self, batch, pad_token=0): bytes, token_lengths, byte_lengths = zip(*batch) # Set bytes shape because byte length should always be `max_byte_len` no # matter how long the bytes in the batch are. pad_shape = ( len(batch), precision.pad_length(max(len(length) for length in byte_lengths)), self.max_byte_len, ) return ( pad_and_tensorize(bytes, pad_shape=pad_shape, pad_token=pad_token), pad_and_tensorize(token_lengths), pad_and_tensorize(byte_lengths), )
[docs] def sort_key(self, row): return len(row[0])
[docs]class Float1DListTensorizer(Tensorizer): """ Tensorizes the 1d list of floats -- List[float] TODO: Even though very similar, 'FloatListTensorizer' currently does not support this vanilla case for tensorization of List[float]. In future, if 'FloatListTensorizer' accommodates this case, we do not need this separate tensorizer. """ __TENSORIZER_SCRIPT_IMPL__ = ScriptFloat1DListTensorizer
[docs] class Config(Tensorizer.Config): # inputs column: str = "float_list_column" pad_token: float = 1.0
[docs] @classmethod def from_config(cls, config: Config, **kwargs): return cls(config, **kwargs)
def __init__(self, config: Config, **kwargs): # mention link probability self.column = config.column self.pad_token = config.pad_token @property def column_schema(self): return [(self.column, List[float])]
[docs] def initialize(self, from_scratch=True): # start reading through data source while True: yield
[docs] def numberize(self, row): assert self.column in row, """1d float-list column not present in the data""" return row[self.column]
[docs] def tensorize(self, batch): values = pad_and_tensorize(batch, pad_token=self.pad_token, dtype=torch.float) return values
@lazy_property def tensorizer_script_impl(self): return ScriptFloat1DListTensorizer()
[docs]class Integer1DListTensorizer(Tensorizer): """ Tensorizes the 1d list of integers -- List[int] """ __TENSORIZER_SCRIPT_IMPL__ = ScriptInteger1DListTensorizer SPAN_PAD_IDX = 0
[docs] class Config(Tensorizer.Config): # inputs column: str = "int_list_column"
[docs] @classmethod def from_config(cls, config: Config, **kwargs): return cls(config, **kwargs)
def __init__(self, config: Config, **kwargs): self.column = config.column @property def column_schema(self): return [(self.column, List[int])]
[docs] def initialize(self, from_scratch=True): # start reading through data source while True: yield
[docs] def numberize(self, row): assert self.column in row, """Integer 1d list column not present in the data""" return row[self.column]
[docs] def tensorize(self, batch): values = pad_and_tensorize(batch, pad_token=self.SPAN_PAD_IDX) return values
@lazy_property def tensorizer_script_impl(self): return ScriptInteger1DListTensorizer()
[docs]class CharacterVocabTokenTensorizerScriptImpl(TensorizerScriptImpl): def __init__( self, add_bos_token: bool, add_eos_token: bool, use_eos_token_for_bos: bool, max_seq_len: int, vocab: Vocabulary, tokenizer: Optional[Tokenizer], ): super().__init__() if tokenizer is not None and hasattr(tokenizer, "torchscriptify"): try: self.tokenizer = tokenizer.torchscriptify() except NotImplementedError: # This is fine as long as the exported tokenizer is only used # in pre-tokenized mode self.tokenizer = None else: self.tokenizer = None self.do_nothing_tokenizer = ScriptDoNothingTokenizer() self.vocab = ScriptVocabulary( list(vocab), pad_idx=vocab.get_pad_index(), bos_idx=vocab.get_bos_index() if add_bos_token else -1, eos_idx=vocab.get_eos_index() if add_eos_token else -1, ) self.add_bos_token = add_bos_token self.add_eos_token = add_eos_token self.use_eos_token_for_bos = use_eos_token_for_bos self.max_seq_len = max_seq_len
[docs] def tokenize( self, row_text: Optional[str] = None, row_pre_tokenized: Optional[List[str]] = None, ) -> Tuple[List[List[str]], List[int]]: tokens: List[Tuple[str, int, int]] = [] char_tokens: List[List[str]] = [] char_tokens_lengths: List[int] = [] if row_text is not None: assert self.tokenizer is not None tokens = self.tokenizer.tokenize(row_text) elif row_pre_tokenized is not None: for token in row_pre_tokenized: tokens.extend(self.do_nothing_tokenizer.tokenize(token)) for token in tokens: chars: List[str] = [] for char in token[0]: chars.append(char) char_tokens.append(chars) char_tokens_lengths.append(len(chars)) return char_tokens, char_tokens_lengths
[docs] def numberize( self, char_tokens: List[List[str]], char_tokens_lengths: List[int] ) -> Tuple[List[List[int]], List[int]]: tokens: List[List[int]] = [] tokens = self.vocab.lookup_indices_2d(char_tokens) return tokens, char_tokens_lengths
[docs] def tensorize( self, tokens: List[List[List[int]]], tokens_lengths: List[List[int]], ) -> Tuple[torch.Tensor, torch.Tensor]: tokens_padded: List[List[List[int]]] = [] tokens_lengths_padded: List[List[int]] = [] tokens_padded, tokens_lengths_padded = pad_3d( tokens, tokens_lengths, self.vocab.get_pad_index() ) tokens_tensor: torch.Tensor = torch.tensor(tokens_padded, dtype=torch.long) tokens_lengths_tensor: torch.Tensor = torch.tensor( tokens_lengths_padded, dtype=torch.long ) return (tokens_tensor, tokens_lengths_tensor)
[docs] def get_texts_by_index( self, texts: Optional[List[List[str]]], index: int ) -> Optional[str]: if texts is None or len(texts) == 0: return None # CharacterVocabTokenTensorizer only works with a single text per row, stick with that return texts[index][0]
[docs] def get_tokens_by_index( self, tokens: Optional[List[List[List[str]]]], index: int ) -> Optional[List[str]]: if tokens is None or len(tokens) == 0: return None # CharacterVocabTokenTensorizer only works with a single text per row, stick with that return tokens[index][0]
[docs] def forward(self, inputs: ScriptBatchInput) -> Tuple[torch.Tensor, torch.Tensor]: tokens_3d: List[List[List[int]]] = [] seq_lens_2d: List[List[int]] = [] for idx in range(self.batch_size(inputs)): char_tokens: List[List[int]] = [] char_tokens_lengths: List[int] = [] char_tokens, char_tokens_lengths = self.tokenize( self.get_texts_by_index(inputs.texts, idx), self.get_tokens_by_index(inputs.tokens, idx), ) numberized: Tuple[List[List[int]], List[int]] = self.numberize( char_tokens, char_tokens_lengths ) tokens_3d.append(numberized[0]) seq_lens_2d.append(numberized[1]) return self.tensorize(tokens_3d, seq_lens_2d)
[docs]class CharacterVocabTokenTensorizer(Tensorizer): """Turn words into 2-dimensional tensors of ints based on the char vocab. Words are padded to the maximum word length (also capped at `max_char_length`). Sequence lengths are the length of each token. The difference with is that the CharacterTokenTensorizer uses the ascii value and does not require to build a vocab. Here we tensorize based on the vocab. """ __TENSORIZER_SCRIPT_IMPL__ = CharacterVocabTokenTensorizerScriptImpl
[docs] class Config(Tensorizer.Config): #: The name of the text column to parse from the data source. column: str = "text" #: The tokenizer to use to split input text into tokens. tokenizer: Tokenizer.Config = Tokenizer.Config() add_bos_token: bool = False add_eos_token: bool = False use_eos_token_for_bos: bool = False max_seq_len: Optional[int] = None vocab: VocabConfig = VocabConfig() vocab_file_delimiter: str = " "
[docs] @classmethod def from_config(cls, config: Config): tokenizer = create_component(ComponentType.TOKENIZER, config.tokenizer) return cls( text_column=config.column, tokenizer=tokenizer, add_bos_token=config.add_bos_token, add_eos_token=config.add_eos_token, use_eos_token_for_bos=config.use_eos_token_for_bos, max_seq_len=config.max_seq_len, vocab_config=config.vocab, vocab_file_delimiter=config.vocab_file_delimiter, is_input=config.is_input, )
def __init__( self, text_column, tokenizer=None, add_bos_token=Config.add_bos_token, add_eos_token=Config.add_eos_token, use_eos_token_for_bos=Config.use_eos_token_for_bos, max_seq_len=Config.max_seq_len, vocab_config=None, vocab=None, vocab_file_delimiter=" ", is_input=Config.is_input, ): self.text_column = text_column self.tokenizer = tokenizer or Tokenizer() self.vocab = vocab self.add_bos_token = add_bos_token self.add_eos_token = add_eos_token self.use_eos_token_for_bos = use_eos_token_for_bos self.max_seq_len = max_seq_len or 2 ** 30 # large number self.vocab_builder = None self.vocab_config = vocab_config or VocabConfig() self.vocab_file_delimiter = vocab_file_delimiter super().__init__(is_input) @property def column_schema(self): return [(self.text_column, str)]
[docs] def initialize(self, vocab_builder=None, from_scratch=True): """Build vocabulary based on training corpus.""" if self.vocab and from_scratch: if self.vocab_config.build_from_data or self.vocab_config.vocab_files: print( f"`{self.text_column}` column: vocab already provided, skipping " f"adding tokens from data and from vocab files." ) return if not self.vocab_config.build_from_data and not self.vocab_config.vocab_files: raise ValueError( f"To create token tensorizer for '{self.text_column}', either " f"`build_from_data` or `vocab_files` must be set." ) if not self.vocab_builder: # else means not initialize from scratch, self.vocab_builder # would be set already self.vocab_builder = vocab_builder or VocabBuilder( delimiter=self.vocab_file_delimiter ) self.vocab_builder.use_bos = self.add_bos_token self.vocab_builder.use_eos = self.add_eos_token if not self.vocab_config.build_from_data: self._add_vocab_from_files() self.vocab = self.vocab_builder.make_vocab() return try: while True: row = yield raw_text = row[self.text_column] tokenized = self.tokenizer.tokenize(raw_text) # tokenize the word tokens further char_tokenized = self.character_tokenize(tokenized) # build the vocab self.vocab_builder.add_all(char_tokenized) except GeneratorExit: self.vocab_builder.truncate_to_vocab_size( self.vocab_config.size_from_data, self.vocab_config.min_counts ) self._add_vocab_from_files() self.vocab = self.vocab_builder.make_vocab()
[docs] def character_tokenize(self, tokens: List[Token]): res = [] for token in tokens: chars = [] for char in token.value: chars.append(char) res.append(chars) return res
def _add_vocab_from_files(self): for vocab_file in self.vocab_config.vocab_files: with as f: self.vocab_builder.add_from_file( f, vocab_file.skip_header_line, vocab_file.lowercase_tokens, vocab_file.size_limit, )
[docs] def numberize(self, row): """Tokenize, look up in vocabulary.""" raw_text = row[self.text_column] tokenized = self.tokenizer.tokenize(raw_text) tokens_in_chars = self.character_tokenize(tokenized) char_tokens = self.vocab.lookup_all(tokens_in_chars) char_tokens_lengths = [len(token) for token in tokens_in_chars] return char_tokens, char_tokens_lengths
[docs] def tensorize(self, batch): char_tokens, char_tokens_lengths = zip(*batch) return ( pad_and_tensorize(char_tokens, self.vocab.get_pad_index()), pad_and_tensorize(char_tokens_lengths), )
@lazy_property def tensorizer_script_impl(self): return self.__TENSORIZER_SCRIPT_IMPL__( add_bos_token=self.add_bos_token, add_eos_token=self.add_eos_token, use_eos_token_for_bos=self.use_eos_token_for_bos, max_seq_len=self.max_seq_len, vocab=self.vocab, tokenizer=self.tokenizer, )
[docs]class CharacterTokenTensorizer(TokenTensorizer): """Turn words into 2-dimensional tensors of ints based on their ascii values. Words are padded to the maximum word length (also capped at `max_char_length`). Sequence lengths are the length of each token, 0 for pad token. """
[docs] class Config(TokenTensorizer.Config): #: The max character length for a token. max_char_length: int = 20
def __init__(self, max_char_length: int = Config.max_char_length, **kwargs): self.max_char_length = max_char_length super().__init__(**kwargs) # Don't need to create a vocab initialize = Tensorizer.initialize
[docs] def numberize(self, row): """Convert text to characters, pad batch.""" tokens = self.tokenizer.tokenize(row[self.text_column])[: self.max_seq_len] characters = [ self._numberize_token(token)[: self.max_char_length] for token in tokens ] token_lengths = len(tokens) char_lengths = [len(token_chars) for token_chars in characters] return characters, token_lengths, char_lengths
def _numberize_token(self, token): return [ord(c) for c in token.value]
[docs] def tensorize(self, batch): characters, token_lengths, char_lengths = zip(*batch) return ( pad_and_tensorize(characters), pad_and_tensorize(token_lengths), pad_and_tensorize(char_lengths), )
[docs] def sort_key(self, row): return len(row[0])
[docs]class LabelTensorizer(Tensorizer): """Numberize labels. Label can be used as either input or target. NB: if the labels are used as targets for binary classification with a loss such as cosine distance, the order of the `label_vocab` *does* matter, and it should be `[negative_class, positive_class]`. """ __EXPANSIBLE__ = True
[docs] class Config(Tensorizer.Config): #: The name of the label column to parse from the data source. column: str = "label" #: Whether to allow for unknown labels at test/prediction time. allow_unknown: bool = False #: Whether vocab should have pad, usually false when label is used as target. pad_in_vocab: bool = False #: The label values, if known. Will skip initialization step if provided. label_vocab: Optional[List[str]] = None #: File with the label values. This can be used when the label space is #: too large to specify these as a list. The file should not contain #: a header. label_vocab_file: Optional[str] = None # Indicate if it can be used to generate input Tensors for prediction. is_input: bool = False #: Add these labels to the vocabulary during the initialization step (only #: if the initialization step is not skipped). Useful when the dataset may #: not include all labels, as for incremental trainings. add_labels: Optional[List[str]] = None
[docs] @classmethod def from_config(cls, config: Config): return cls( config.column, config.allow_unknown, config.pad_in_vocab, config.label_vocab, config.label_vocab_file, config.is_input, config.add_labels, )
def __init__( self, label_column: str = "label", allow_unknown: bool = False, pad_in_vocab: bool = False, label_vocab: Optional[List[str]] = None, label_vocab_file: Optional[str] = None, is_input: bool = Config.is_input, add_labels: Optional[List[str]] = None, ): self.label_column = label_column self.pad_in_vocab = pad_in_vocab self.vocab_builder = VocabBuilder() self.vocab_builder.use_pad = pad_in_vocab self.vocab_builder.use_unk = allow_unknown self.add_labels = add_labels self.vocab = None self.pad_idx = -1 assert ( label_vocab is None or label_vocab_file is None ), "Cannot specify both label_vocab and label_vocab_file" if label_vocab: self.vocab_builder.add_all(label_vocab) self.vocab, self.pad_idx = self._create_vocab() elif label_vocab_file: with as f: self.vocab_builder.add_from_file( f, skip_header_line=False, lowercase_tokens=False, size=None ) self.vocab, self.pad_idx = self._create_vocab() super().__init__(is_input) @property def column_schema(self): return [(self.label_column, str)]
[docs] def initialize(self, from_scratch=True): """ Look through the dataset for all labels and create a vocab map for them. """ if self.vocab and from_scratch: return try: while True: row = yield labels = row[self.label_column] self.vocab_builder.add_all(labels) except GeneratorExit: if self.add_labels: self.vocab_builder.add_all(self.add_labels) self.vocab, self.pad_idx = self._create_vocab()
def _create_vocab(self): if not self.vocab_builder.has_added_tokens(): error_msg = ( "Label classes are not specified, and no examples or labels were found " "in training data. Either the training data is empty, or the data " "fields are misnamed and no examples are parsed (warnings would appear " "in preceding stdout logs)." ) raise ValueError(error_msg) vocab = self.vocab_builder.make_vocab() pad_idx = ( vocab.get_pad_index() if self.pad_in_vocab else Padding.DEFAULT_LABEL_PAD_IDX ) return vocab, pad_idx
[docs] def numberize(self, row): """Numberize labels.""" return self.vocab.lookup_all(row[self.label_column])
[docs] def tensorize(self, batch): return pad_and_tensorize(batch, self.pad_idx)
[docs]class LabelListTensorizer(LabelTensorizer): """LabelListTensorizer takes a list of labels as input and generate a tuple of tensors (label_idx, list_length). """
[docs] class Config(LabelTensorizer.Config): # pad missing label in the list, including None and empty pad_missing: bool = False
[docs] @classmethod def from_config(cls, config: Config): return cls( config.column, config.allow_unknown, config.pad_in_vocab, config.label_vocab, config.label_vocab_file, config.is_input, pad_missing=config.pad_missing, )
def __init__(self, *args, pad_missing: bool = False, **kwargs): super().__init__(*args, **kwargs) self.pad_missing = pad_missing def __setstate__(self, newstate): # for backward compatibility if "pad_missing" not in newstate: newstate["pad_missing"] = True self.__dict__.update(newstate) @property def column_schema(self): return [(self.label_column, List[str])]
[docs] def numberize(self, row): label_idx_list = [] for label in row[self.label_column]: # Only None and empty is viewed as missing data, values like "False" is legit if label in [None, ""]: if self.pad_missing: label_idx_list.append(self.pad_idx) else: raise Exception( "Found none or empty value in the list, \ while pad_missing is disabled" ) else: label_idx_list.append(self.vocab.lookup_all(label)) return label_idx_list, len(label_idx_list)
[docs] def tensorize(self, batch): labels, labels_len = zip(*batch) return super().tensorize(labels), pad_and_tensorize(labels_len)
[docs] def sort_key(self, row): # use list length as sort key return row[1]
[docs]class LabelListRankTensorizer(LabelTensorizer): """LabelListRankTensorizer takes a list of a single array with [[labelA, rankA], [labelB, rankB], ...] as input and generate a tuple of tensors (label_idx, list_length). Example: Input: ["[\"weather\",\"1\"]","[\"business\",\"1\"]"] Output of size len(vocab) {"timer", "weather", "business"} => [0, 1, 1]. This would suggest both labels are of equal rank. """
[docs] class Config(LabelTensorizer.Config): # pad missing label in the list, including None and empty pad_missing: bool = False
[docs] @classmethod def from_config(cls, config: Config): return cls( config.column, config.allow_unknown, config.pad_in_vocab, config.label_vocab, config.label_vocab_file, config.is_input, pad_missing=config.pad_missing, )
def __init__(self, *args, pad_missing: bool = False, **kwargs): super().__init__(*args, **kwargs) self.pad_missing = pad_missing def __setstate__(self, newstate): # for backward compatibility if "pad_missing" not in newstate: newstate["pad_missing"] = True self.__dict__.update(newstate) @property def column_schema(self): return [(self.label_column, List[str])]
[docs] def numberize(self, row): label_idx_list = [0] * len(self.vocab) elem_struct_0 = list(map(json.loads, row[self.label_column])) for elemRow in elem_struct_0: label = elemRow[0] labelRank = int(elemRow[1]) # Only None and empty is viewed as missing data, values like "False" is legit if label in [None, ""]: if self.pad_missing: raise Exception("Invalid state for LabelStructTensorizer") else: raise Exception( "Found none or empty value in the list, \ while pad_missing is disabled" ) else: if labelRank == 1: label_idx_list[self.vocab.lookup_all(label)] = 1 return label_idx_list, len(label_idx_list)
[docs] def tensorize(self, batch): labels, labels_len = zip(*batch) return super().tensorize(labels), pad_and_tensorize(labels_len)
[docs] def sort_key(self, row): # use list length as sort key return row[1]
[docs] def initialize(self, from_scratch=True): """ Look through the dataset for all labels and create a vocab map for them. """ if self.vocab and from_scratch: return try: while True: row = yield elem_struct_0 = list(map(json.loads, row[self.label_column])) for elemRow in elem_struct_0: self.vocab_builder.add_all(elemRow[0]) except GeneratorExit: if self.add_labels: self.vocab_builder.add_all(self.add_labels) self.vocab, self.pad_idx = self._create_vocab()
[docs]class UidTensorizer(Tensorizer): """Numberize user IDs which can be either strings or tensors."""
[docs] class Config(Tensorizer.Config): column: str = "uid" # Allow unknown users during prediction. allow_unknown: bool = True
[docs] @classmethod def from_config(cls, config: Config): return cls(config.column, config.allow_unknown, config.is_input)
def __init__( self, uid_column: str = "uid", allow_unknown: bool = True, is_input: bool = Config.is_input, ): self.uid_column = uid_column self.vocab_builder = VocabBuilder() # User IDs should have the same lengths so need not to use padding. self.vocab_builder.use_pad = False self.vocab_builder.use_unk = allow_unknown self.vocab = None self.pad_idx = -1 super().__init__(is_input) @property def column_schema(self): return [(self.uid_column, str)] def _get_row_value_as_str(self, row) -> str: """Handle the case that the row value is not a string.""" row_value = row[self.uid_column] if isinstance(row_value, torch.Tensor): assert ( row_value.dim() == 0 or len(row_value) == 1 ), "Cannot get the value of multi-dimensional tensors." row_value = str(row_value.item()) return row_value
[docs] def initialize(self, from_scratch=True): """ Look through the dataset for all uids and create a vocab map for them. """ if self.vocab and from_scratch: return try: while True: row = yield uids = self._get_row_value_as_str(row) self.vocab_builder.add_all(uids) except GeneratorExit: self.vocab, self.pad_idx = self._create_vocab()
def _create_vocab(self): vocab = self.vocab_builder.make_vocab() pad_idx = Padding.DEFAULT_LABEL_PAD_IDX return vocab, pad_idx
[docs] def numberize(self, row): """Numberize uids.""" return self.vocab.lookup_all(self._get_row_value_as_str(row))
[docs] def tensorize(self, batch): return pad_and_tensorize(batch, self.pad_idx)
[docs]class SoftLabelTensorizer(LabelTensorizer): """ Handles numberizing labels for knowledge distillation. This still requires the same label column as `LabelTensorizer` for the "true" label, but also processes soft "probabilistic" labels generated from a teacher model, via three new columns. """
[docs] class Config(LabelTensorizer.Config): probs_column: str = "target_probs" logits_column: str = "target_logits" labels_column: str = "target_labels"
[docs] @classmethod def from_config(cls, config: Config): return cls( config.column, config.allow_unknown, config.pad_in_vocab, config.label_vocab, config.probs_column, config.logits_column, config.labels_column, config.label_vocab_file, config.is_input, )
def __init__( self, label_column: str = "label", allow_unknown: bool = False, pad_in_vocab: bool = False, label_vocab: Optional[List[str]] = None, probs_column: str = "target_probs", logits_column: str = "target_logits", labels_column: str = "target_labels", label_vocab_file: Optional[str] = None, is_input: bool = Config.is_input, ): super().__init__( label_column, allow_unknown, pad_in_vocab, label_vocab, label_vocab_file, is_input, ) self.probs_column = probs_column self.logits_column = logits_column self.labels_column = labels_column @property def column_schema(self): return [ (self.label_column, str), (self.probs_column, List[float]), (self.logits_column, List[float]), (self.labels_column, List[str]), ]
[docs] def numberize(self, row): """Numberize hard and soft labels""" label = self.vocab.lookup_all(row[self.label_column]) row_labels = row[self.labels_column] probs = align_target_label(row[self.probs_column], row_labels, self.vocab.idx) logits = align_target_label(row[self.logits_column], row_labels, self.vocab.idx) return label, probs, logits
[docs] def tensorize(self, batch): label, probs, logits = zip(*batch) # Set probs and logits shape because they should not change with fp16 probs_shape = len(probs), len(self.vocab) return ( pad_and_tensorize(label, self.pad_idx), pad_and_tensorize(probs, dtype=torch.float, pad_shape=probs_shape), pad_and_tensorize(logits, dtype=torch.float, pad_shape=probs_shape), )
[docs]class NumericLabelTensorizer(Tensorizer): """Numberize numeric labels."""
[docs] class Config(Tensorizer.Config): #: The name of the label column to parse from the data source. column: str = "label" #: If provided, the range of values the raw label can be. Will rescale the #: label values to be within [0, 1]. rescale_range: Optional[List[float]] = None # Indicate if it can be used to generate input Tensors for prediction is_input: bool = False
[docs] @classmethod def from_config(cls, config: Config): return cls(config.column, config.rescale_range, config.is_input)
def __init__( self, label_column: str = Config.column, rescale_range: Optional[List[float]] = Config.rescale_range, is_input: bool = Config.is_input, ): self.label_column = label_column if rescale_range is not None: assert len(rescale_range) == 2 assert rescale_range[0] < rescale_range[1] self.rescale_range = rescale_range super().__init__(is_input) @property def column_schema(self): return [(self.label_column, str)]
[docs] def numberize(self, row): """Numberize labels.""" label = float(row[self.label_column]) if self.rescale_range is not None: label -= self.rescale_range[0] label /= self.rescale_range[1] - self.rescale_range[0] assert 0 <= label <= 1 return label
[docs] def tensorize(self, batch): return pad_and_tensorize(batch, dtype=torch.float)
[docs]class FloatListTensorizer(Tensorizer): """Numberize numeric labels."""
[docs] class Config(Tensorizer.Config): #: The name of the label column to parse from the data source. column: str error_check: bool = False dim: Optional[int] = None # If you wish to normalize the training data here, you probably also # want to normalize the inference data. This is currently supported with # TorchScript models (see DocModel). See T48207828 for progress on # supporting Caffe2 models. normalize: bool = False
[docs] @classmethod def from_config(cls, config: Config): return cls( config.column, config.error_check, config.dim, config.normalize, config.is_input, )
def __init__( self, column: str, error_check: bool, dim: Optional[int], normalize: bool, is_input: bool = Config.is_input, ): self.column = column self.error_check = error_check self.dim = dim assert not normalize or self.dim is not None, "Normalization requires dim" assert not self.error_check or self.dim is not None, "Error check requires dim" # If normalize and error_check both are false and dim is still None, set # it to 0 so that it can successfully create VectorNormalizer if dim is None: dim = 0 self.normalizer = VectorNormalizer(dim, normalize) super().__init__(is_input) @property def column_schema(self): return [(self.column, List[float])]
[docs] def initialize(self): if not self.normalizer.do_normalization: self.normalizer.calculate_feature_stats() return try: while True: row = yield res = row[self.column] self.normalizer.update_meta_data(res) except GeneratorExit: self.normalizer.calculate_feature_stats()
[docs] def numberize(self, row): dense = row[self.column] if self.error_check: assert ( len(dense) == self.dim ), f"Dense feature didn't match expected dimension {self.dim}: {dense}" return self.normalizer.normalize([dense])[0]
[docs] def tensorize(self, batch): # training in fp16 will pad tensor shape to multiple of 8 unless # explicitly specify pad_shape to avoid padding. pad_shape = (len(batch), self.dim) if self.dim else None return maybe_half( pad_and_tensorize(batch, dtype=torch.float, pad_shape=pad_shape) )
NO_LABEL = constants.Token("NoLabel")
[docs]class SlotLabelTensorizer(Tensorizer): """Numberize word/slot labels."""
[docs] class Config(Tensorizer.Config): #: The name of the slot label column to parse from the data source. slot_column: str = "slots" #: The name of the text column to parse from the data source. #: We need this to be able to generate tensors which correspond to input text. text_column: str = "text" #: The tokenizer to use to split input text into tokens. This should be #: configured in a way which yields tokens consistent with the tokens input to #: or output by a model, so that the labels generated by this tensorizer #: will match the indices of the model's tokens. tokenizer: Tokenizer.Config = Tokenizer.Config() #: Whether to allow for unknown labels at test/prediction time allow_unknown: bool = False # Indicate if it can be used to generate input Tensors for prediction is_input: bool = False
[docs] @classmethod def from_config(cls, config: Config): tokenizer = create_component(ComponentType.TOKENIZER, config.tokenizer) return cls( config.slot_column, config.text_column, tokenizer, config.allow_unknown, config.is_input, )
def __init__( self, slot_column: str = Config.slot_column, text_column: str = Config.text_column, tokenizer: Tokenizer = None, allow_unknown: bool = Config.allow_unknown, is_input: bool = Config.is_input, ): self.slot_column = slot_column self.text_column = text_column self.allow_unknown = allow_unknown self.tokenizer = tokenizer or Tokenizer() self.pad_idx = Padding.DEFAULT_LABEL_PAD_IDX self.vocab_builder = VocabBuilder() self.vocab_builder.add(NO_LABEL) self.vocab_builder.use_pad = False self.vocab_builder.use_unk = self.allow_unknown self.vocab = None super().__init__(is_input) @property def column_schema(self): return [(self.text_column, str), (self.slot_column, List[Slot])]
[docs] def initialize(self, from_scratch=True): """Look through the dataset for all labels and create a vocab map for them.""" if self.vocab and from_scratch: return try: while True: row = yield slots = row[self.slot_column] self.vocab_builder.add_all(s.label for s in slots) except GeneratorExit: self.vocab = self.vocab_builder.make_vocab()
[docs] def numberize(self, row): """ Turn slot labels and text into a list of token labels with the same length as the number of tokens in the text. """ slots = row[self.slot_column] text = row[self.text_column] tokens = self.tokenizer.tokenize(text) indexed_tokens = tokens labels = [] current_slot = 0 current_token = 0 while current_token < len(tokens) and current_slot < len(slots): _, start, end = indexed_tokens[current_token] slot = slots[current_slot] if start > slot.end: current_slot += 1 else: current_token += 1 labels.append(slot.label if end > slot.start else NO_LABEL) labels += [NO_LABEL] * (len(tokens) - current_token) return self.vocab.lookup_all(labels)
[docs] def tensorize(self, batch): return pad_and_tensorize(batch, dtype=torch.long)
[docs]class SlotLabelTensorizerExpansible(SlotLabelTensorizer): """Create a base SlotLabelTensorizer to support selecting different types in ModelInput.""" __EXPANSIBLE__ = True
[docs]class GazetteerTensorizer(Tensorizer): """ Create 3 tensors for dict features. - idx: index of feature in token order. - weights: weight of feature in token order. - lens: number of features per token. For each input token, there will be the same number of `idx` and `weights` entries. (equal to the max number of features any token has in this row). The values in `lens` will tell how many of these features are actually used per token. Input format for the dict column is json and should be a list of dictionaries containing the "features" and their weight for each relevant "tokenIdx". Example: :: text: "Order coffee from Starbucks please" dict: [ {"tokenIdx": 1, "features": {"drink/beverage": 0.8, "music/song": 0.2}}, {"tokenIdx": 3, "features": {"store/coffee_shop": 1.0}} ] if we assume this vocab :: vocab = { UNK: 0, PAD: 1, "drink/beverage": 2, "music/song": 3, "store/coffee_shop": 4 } this example will result in those tensors: :: idx = [1, 1, 2, 3, 1, 1, 4, 1, 1, 1] weights = [0.0, 0.0, 0.8, 0.2, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0] lens = [1, 2, 1, 1, 1] """
[docs] class Config(Tensorizer.Config): text_column: str = "text" dict_column: str = "dict" #: tokenizer to split text and create dict tensors of the same size. tokenizer: Tokenizer.Config = Tokenizer.Config()
[docs] @classmethod def from_config(cls, config: Config): tokenizer = create_component(ComponentType.TOKENIZER, config.tokenizer) return cls(config.text_column, config.dict_column, tokenizer, config.is_input)
def __init__( self, text_column: str = Config.text_column, dict_column: str = Config.dict_column, tokenizer: Tokenizer = None, is_input: bool = Config.is_input, ): self.text_column = text_column self.dict_column = dict_column self.tokenizer = tokenizer or Tokenizer() self.vocab_builder = VocabBuilder() self.vocab = None super().__init__(is_input) @property def column_schema(self): return [(self.text_column, str), (self.dict_column, Gazetteer)]
[docs] def initialize(self, from_scratch=True): """ Look through the dataset for all dict features to create vocab. """ if self.vocab and from_scratch: return try: while True: row = yield for token_dict in row[self.dict_column]: self.vocab_builder.add_all(token_dict["features"]) except GeneratorExit: self.vocab = self.vocab_builder.make_vocab()
[docs] def numberize(self, row): """ Numberize dict features. Fill in for tokens with no features with PAD and weight 0.0. All tokens need to have at least one entry. Tokens with more than one feature will have multiple idx and weight added in sequence. """ num_tokens = len(self.tokenizer.tokenize(row[self.text_column])) num_labels = max(len(t["features"]) for t in row[self.dict_column]) res_idx = [self.vocab.get_pad_index()] * (num_labels * num_tokens) res_weights = [0.0] * (num_labels * num_tokens) res_lens = [1] * num_tokens for dict_feature in row[self.dict_column]: idx = dict_feature["tokenIdx"] feats = dict_feature["features"] pos = idx * num_labels res_lens[idx] = len(feats) # write values at the correct pos for label, weight in feats.items(): res_idx[pos] = self.vocab.lookup_all(label) res_weights[pos] = weight pos += 1 return res_idx, res_weights, res_lens
[docs] def tensorize(self, batch): # Pad a minibatch of dictionary features to be # batch_size * max_number_of_words * max_number_of_features # unpack the minibatch feats, weights, lengths = zip(*batch) lengths_flattened = [li for l_list in lengths for li in l_list] seq_lens = [len(l_list) for l_list in lengths] max_ex_len = precision.pad_length(max(seq_lens)) max_feat_len = max(lengths_flattened) all_lengths, all_feats, all_weights = [], [], [] for i, seq_len in enumerate(seq_lens): ex_feats, ex_weights, ex_lengths = [], [], [] feats_lengths, feats_vals, feats_weights = lengths[i], feats[i], weights[i] max_feat_len_example = max(feats_lengths) r_offset = 0 for _ in feats_lengths: # The dict feats obtained from the featurizer will have necessary # padding at the utterance level. Therefore we move the offset by # max feature length in the example. ex_feats.extend(feats_vals[r_offset : r_offset + max_feat_len_example]) ex_feats.extend( [self.vocab.get_pad_index()] * (max_feat_len - max_feat_len_example) ) ex_weights.extend( feats_weights[r_offset : r_offset + max_feat_len_example] ) ex_weights.extend([0.0] * (max_feat_len - max_feat_len_example)) r_offset += max_feat_len_example ex_lengths.extend(feats_lengths) # Pad examples ex_padding = (max_ex_len - seq_len) * max_feat_len ex_feats.extend([self.vocab.get_pad_index()] * ex_padding) ex_weights.extend([0.0] * ex_padding) ex_lengths.extend([1] * (max_ex_len - seq_len)) all_feats.append(ex_feats) all_weights.append(ex_weights) all_lengths.append(ex_lengths) return ( cuda.tensor(all_feats, torch.long), precision.maybe_half(cuda.tensor(all_weights, torch.float)), cuda.tensor(all_lengths, torch.long), )
[docs]class SeqTokenTensorizer(Tensorizer): """ Tensorize a sequence of sentences. The input is a list of strings, like this one: :: ["where do you wanna meet?", "MPK"] if we assume this vocab :: vocab { UNK: 0, PAD: 1, 'where': 2, 'do': 3, 'you': 4, 'wanna': 5, 'meet?': 6, 'mpk': 7 } this example will result in those tensors: :: idx = [[2, 3, 4, 5, 6], [7, 1, 1, 1, 1]] sentence_len = [5, 1] seq_len = [2] If you're using BOS, EOS, BOL and EOL, the vocab will look like this :: vocab { UNK: 0, PAD: 1, BOS: 2, EOS: 3, BOL: 4, EOL: 5 'where': 6, 'do': 7, 'you': 8, 'wanna': 9, 'meet?': 10, 'mpk': 11 } this example will result in those tensors: :: idx = [ [2, 4, 3, 1, 1, 1, 1], [2, 6, 7, 8, 9, 10, 3], [2, 11, 3, 1, 1, 1, 1], [2, 5, 3, 1, 1, 1, 1] ] sentence_len = [3, 8, 3, 3] seq_len = [4] """
[docs] class Config(Tensorizer.Config): column: str = "text_seq" # this is actually the max token count, it's named seq_len beause the variable is used in _tokenize # function from TokenTensorizer max_seq_len: Optional[int] = None #: sentence markers add_bos_token: bool = False add_eos_token: bool = False use_eos_token_for_bos: bool = False #: list markers add_bol_token: bool = False add_eol_token: bool = False use_eol_token_for_bol: bool = False #: The tokenizer to use to split input text into tokens. tokenizer: Tokenizer.Config = Tokenizer.Config() # the max number of turns in one example max_turn: int = 50
[docs] @classmethod def from_config(cls, config: Config): tokenizer = create_component(ComponentType.TOKENIZER, config.tokenizer) return cls( column=config.column, tokenizer=tokenizer, add_bos_token=config.add_bos_token, add_eos_token=config.add_eos_token, use_eos_token_for_bos=config.use_eos_token_for_bos, add_bol_token=config.add_bol_token, add_eol_token=config.add_eol_token, use_eol_token_for_bol=config.use_eol_token_for_bol, max_seq_len=config.max_seq_len, is_input=config.is_input, max_turn=config.max_turn, )
def __init__( self, column: str = Config.column, tokenizer=None, add_bos_token: bool = Config.add_bos_token, add_eos_token: bool = Config.add_eos_token, use_eos_token_for_bos: bool = Config.use_eos_token_for_bos, add_bol_token: bool = Config.add_bol_token, add_eol_token: bool = Config.add_eol_token, use_eol_token_for_bol: bool = Config.use_eol_token_for_bol, max_seq_len=Config.max_seq_len, vocab=None, is_input: bool = Config.is_input, max_turn=50, ): self.column = column self.tokenizer = tokenizer or Tokenizer() self.vocab = vocab self.vocab_builder = None self.add_bos_token = add_bos_token self.add_eos_token = add_eos_token self.use_eos_token_for_bos = use_eos_token_for_bos self.add_bol_token = add_bol_token self.add_eol_token = add_eol_token self.use_eol_token_for_bol = use_eol_token_for_bol # this is actually the max token count, it's named seq_len beause the variable is used in _tokenize # function from TokenTensorizer self.max_seq_len = max_seq_len or 2 ** 30 # large number self.max_turn = max_turn super().__init__(is_input) @property def column_schema(self): return [(self.column, List[str])]
[docs] def initialize(self, vocab_builder=None, from_scratch=True): """Build vocabulary based on training corpus.""" if self.vocab and from_scratch: return if not self.vocab_builder: self.vocab_builder = vocab_builder or VocabBuilder() self.vocab_builder.use_bos = self.add_bos_token self.vocab_builder.use_eos = self.add_eos_token self.vocab_builder.use_bol = self.add_bol_token self.vocab_builder.use_eol = self.add_eol_token try: while True: row = yield for raw_text in row[self.column]: tokenized = self.tokenizer.tokenize(raw_text) self.vocab_builder.add_all([t.value for t in tokenized]) except GeneratorExit: self.vocab = self.vocab_builder.make_vocab()
_lookup_tokens = TokenTensorizer._lookup_tokens _tokenize = TokenTensorizer._tokenize
[docs] def numberize(self, row): """Tokenize, look up in vocabulary.""" return self._process(row, raw_token_output=False)
[docs] def prepare_input(self, row): """Tokenize, return tokenized_texts in raw text""" seq, sen_lens, seq_lens = self._process(row, raw_token_output=True) # convert all special tokens to str return [[str(token) for token in sen] for sen in seq], sen_lens, seq_lens
def _process(self, row, raw_token_output): sentence_process_fn = ( self._tokenize if raw_token_output else self._lookup_tokens ) pad_token = ( self.vocab.pad_token if raw_token_output else self.vocab.get_pad_index() ) seq = [] if self.add_bol_token: bol = EOL if self.use_eol_token_for_bol else BOL tokens, _, _ = sentence_process_fn(pre_tokenized=[Token(bol, -1, -1)]) seq.append(list(tokens)) for raw_text in row[self.column][: self.max_turn]: tokens, _, _ = sentence_process_fn(raw_text) seq.append(list(tokens)) if self.add_eol_token: tokens, _, _ = sentence_process_fn(pre_tokenized=[Token(EOL, -1, -1)]) seq.append(list(tokens)) max_len = max(len(sentence) for sentence in seq) sentence_lens = [] for sentence in seq: sen_len = len(sentence) sentence_lens.append(sen_len) pad_len = max_len - sen_len if pad_len: sentence += [pad_token] * pad_len return seq, sentence_lens, len(seq)
[docs] def tensorize(self, batch): tokens, sentence_lens, seq_lens = zip(*batch) return ( pad_and_tensorize(tokens, self.vocab.get_pad_index()), # pad with len of 1, because 0 will cause issue in LSTM pad_and_tensorize(sentence_lens, 1), pad_and_tensorize(seq_lens, 0), )
[docs] def sort_key(self, row): # sort by seq_len first, then max sentence len return row[2] + row[1] / self.max_turn
[docs]class AnnotationNumberizer(Tensorizer): """ Not really a Tensorizer (since it does not create tensors) but technically serves the same function. This class parses Annotations in the format below and extracts the actions (type List[List[int]]) :: [IN:GET_ESTIMATED_DURATION How long will it take to [SL:METHOD_TRAVEL drive ] from [SL:SOURCE Chicago ] to [SL:DESTINATION Mississippi ] ] Extraction algorithm is handled by Annotation class. We only care about the list of actions, which before vocab index lookups would look like: :: [ IN:GET_ESTIMATED_DURATION, SHIFT, SHIFT, SHIFT, SHIFT, SHIFT, SHIFT, SL:METHOD_TRAVEL, SHIFT, REDUCE, SHIFT, SL:SOURCE, SHIFT, REDUCE, SHIFT, SL:DESTINATION, SHIFT, REDUCE, ] """
[docs] class Config(Tensorizer.Config): column: str = "seqlogical"
[docs] @classmethod def from_config(cls, config: Config): return cls(column=config.column, is_input=config.is_input)
def __init__( self, column: str = Config.column, vocab=None, is_input: bool = Config.is_input ): self.column = column self.vocab = vocab self.vocab_builder = None super().__init__(is_input) @property def column_schema(self): return [(self.column, str)]
[docs] def initialize(self, vocab_builder=None, from_scratch=True): """Build vocabulary based on training corpus.""" if self.vocab and from_scratch: return if not self.vocab_builder: self.vocab_builder = vocab_builder or VocabBuilder() self.vocab_builder.use_unk = False self.vocab_builder.use_pad = False try: while True: row = yield annotation = Annotation(row[self.column]) actions = annotation.tree.to_actions() self.vocab_builder.add_all(actions) except GeneratorExit: self.vocab = self.vocab_builder.make_vocab() self.shift_idx = self.vocab.idx[SHIFT] self.reduce_idx = self.vocab.idx[REDUCE] def filterVocab(fn): return [token for nt, token in self.vocab.idx.items() if fn(nt)] self.ignore_subNTs_roots = filterVocab(is_unsupported) self.valid_NT_idxs = filterVocab(is_valid_nonterminal) self.valid_IN_idxs = filterVocab(is_intent_nonterminal) self.valid_SL_idxs = filterVocab(is_slot_nonterminal)
[docs] def numberize(self, row): """Tokenize, look up in vocabulary.""" annotation = Annotation(row[self.column]) return self.vocab.lookup_all(annotation.tree.to_actions())
[docs] def tensorize(self, batch): return batch
[docs]class MetricTensorizer(Tensorizer): """A tensorizer which use other tensorizers' numerized data. Used mostly for metric reporting."""
[docs] class Config(Tensorizer.Config): names: List[str] indexes: List[int] # Indicate if it can be used to generate input Tensors for prediction is_input: bool = False
[docs] @classmethod def from_config(cls, config: Config): return cls(config.names, config.indexes, config.is_input)
def __init__( self, names: List[str], indexes: List[int], is_input: bool = Config.is_input ): self.names = names self.indexes = indexes super().__init__(is_input)
[docs] def numberize(self, row): # metric tensorizer will depends on other tensorizers' numeric result return None
[docs] def tensorize(self, batch): raise NotImplementedError
[docs]class NtokensTensorizer(MetricTensorizer): """A tensorizer which will reference another tensorizer's numerized data to calculate the num tokens. Used for calculating tokens per second."""
[docs] def tensorize(self, batch): ntokens = 0 for name, index in zip(self.names, self.indexes): ntokens += sum((sample[index] for sample in batch[name])) return ntokens
[docs]class FloatTensorizer(Tensorizer): """A tensorizer for reading in scalars from the data."""
[docs] class Config(Tensorizer.Config): #: The name of the column to parse from the data source. column: str
[docs] @classmethod def from_config(cls, config: Config): return cls(config.column, config.is_input)
def __init__(self, column: str, is_input: bool = Config.is_input): self.column = column super().__init__(is_input) @property def column_schema(self): return [(self.column, float)]
[docs] def numberize(self, row): return row[self.column]
[docs] def tensorize(self, batch): return cuda.tensor(batch, torch.float)
[docs]class FloatListSeqTensorizer(Tensorizer): """Numberize numeric labels.""" __TENSORIZER_SCRIPT_IMPL__ = ScriptFloatListSeqTensorizer
[docs] class Config(Tensorizer.Config): #: The name of the label column to parse from the data source. column: str error_check: bool = False dim: Optional[int] = None pad_token: float = -1.0
[docs] @classmethod def from_config(cls, config: Config): return cls( config.column, config.error_check, config.dim, config.pad_token, config.is_input, )
def __init__( self, column: str, error_check: bool, dim: Optional[int], pad_token: float = Config.pad_token, is_input: bool = Config.is_input, ): self.column = column self.error_check = error_check self.dim = dim self.pad_token = pad_token assert not self.error_check or self.dim is not None, "Error check requires dim" super().__init__(is_input) @property def column_schema(self): return [(self.column, List[List[float]])]
[docs] def numberize(self, row): floatSeq_features = row[self.column] if self.error_check: for dense in floatSeq_features: assert ( len(dense) == self.dim ), f"Dense feature didn't match expected dimension {self.dim}: {dense}" return floatSeq_features, len(floatSeq_features)
[docs] def tensorize(self, batch): float_lists, lens = zip(*batch) padded_and_tensorized_float_lists = pad_and_tensorize( float_lists, pad_token=self.pad_token, dtype=torch.float ) return (padded_and_tensorized_float_lists, pad_and_tensorize(lens))
@lazy_property def tensorizer_script_impl(self): return ScriptFloatListSeqTensorizer(self.pad_token)
[docs]class String2DListTensorizerScriptImpl(TensorizerScriptImpl): def __init__( self, vocab: Vocabulary, ): super().__init__() self.vocab = ScriptVocabulary( list(vocab), pad_idx=vocab.get_pad_index(), )
[docs] def numberize( self, tokens: List[List[str]] ) -> Tuple[List[List[int]], List[int], int]: token_indices: List[List[int]] = self.vocab.lookup_indices_2d(tokens) token_lengths: List[int] = [] for idx in range(len(token_indices)): token_lengths.append(len(token_indices[idx])) return token_indices, token_lengths, len(token_indices)
[docs] def tensorize( self, tokens_3d: List[List[List[int]]], seq_lens_2d: List[List[int]], seq_lens_1d: List[int], ) -> Tuple[torch.Tensor, torch.Tensor]: padded_batch, _ = pad_3d( batch=tokens_3d, tokens_lengths=seq_lens_2d, pad_idx=self.vocab.pad_idx ) return ( torch.tensor(padded_batch, dtype=torch.long), torch.tensor(seq_lens_1d, dtype=torch.long), )
[docs] def forward( self, inputs: List[List[List[str]]] ) -> Tuple[torch.Tensor, torch.Tensor]: tokens_3d: List[List[List[int]]] = [] seq_lens_2d: List[List[int]] = [] seq_lens_1d: List[int] = [] for idx in range(len(inputs)): numberized: Tuple[List[List[int]], List[int], int] = self.numberize( inputs[idx] ) tokens_3d.append(numberized[0]) seq_lens_2d.append(numberized[1]) seq_lens_1d.append(numberized[2]) return self.tensorize(tokens_3d, seq_lens_2d, seq_lens_1d)
[docs]class String2DListTensorizer(Tensorizer): __TENSORIZER_SCRIPT_IMPL__ = String2DListTensorizerScriptImpl
[docs] class Config(Tensorizer.Config): #: The name of the text column to parse from the data source. column: str = "text" vocab: VocabConfig = VocabConfig() vocab_file_delimiter: str = " "
[docs] @classmethod def from_config(cls, config: Config): return cls( column=config.column, vocab_config=config.vocab, vocab_file_delimiter=config.vocab_file_delimiter, is_input=config.is_input, )
def __init__( self, column, vocab_config=None, vocab=None, vocab_file_delimiter=" ", is_input=Config.is_input, ): self.column = column self.vocab = vocab self.vocab_builder = None self.vocab_config = vocab_config or VocabConfig() self.vocab_file_delimiter = vocab_file_delimiter super().__init__(is_input) @lazy_property def tensorizer_script_impl(self): return self.__TENSORIZER_SCRIPT_IMPL__(vocab=self.vocab) @property def column_schema(self): return [(self.column, str)]
[docs] def initialize(self, from_scratch=True): self.vocab_builder = VocabBuilder(delimiter=self.vocab_file_delimiter) if self.vocab_config.build_from_data: try: while True: row = yield self.vocab_builder.add_all(chain.from_iterable(row[self.column])) except GeneratorExit: pass self.vocab_builder.truncate_to_vocab_size( self.vocab_config.size_from_data, self.vocab_config.min_counts ) elif self.vocab_config.vocab_files is not None: try: # PyText will call this initializer with all the rows, but we don't actually need that while True: row = yield except GeneratorExit: pass # Okay, we finally got to do our thing for vocab_file in self.vocab_config.vocab_files: with as f: self.vocab_builder.add_from_file( f, vocab_file.skip_header_line, vocab_file.lowercase_tokens, vocab_file.size_limit, ) else: raise ValueError( f"To create token tensorizer for '{self.column}', either " f"`build_from_data` or `vocab_files` must be set." ) self.vocab = self.vocab_builder.make_vocab()
[docs] def numberize(self, row): return self.tensorizer_script_impl.numberize(row[self.column])
[docs] def tensorize(self, batch): ( token_indices_tensor, seq_lens_1d, ) = self.tensorizer_script_impl.tensorize_wrapper(*zip(*batch)) return ( cuda.tensor(token_indices_tensor, dtype=torch.long), cuda.tensor(seq_lens_1d, dtype=torch.long), )
[docs]def initialize_tensorizers(tensorizers, data_source, from_scratch=True): """A utility function to stream a data source to the initialize functions of a dict of tensorizers.""" initializers = [] for init in [ tensorizer.initialize(from_scratch=from_scratch) if hasattr(tensorizer, "vocab") else tensorizer.initialize() for tensorizer in tensorizers.values() ]: try: init.send(None) # kick initializers.append(init) except StopIteration: pass if initializers: for row in data_source: for init in initializers: init.send(row)