Source code for pytext.data.test.tensorizers_test

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved

import unittest
from typing import List

import numpy as np
import pandas as pd
import torch
from pytext.common.constants import SpecialTokens
from pytext.data.bert_tensorizer import BERTTensorizer, BERTTensorizerScriptImpl
from pytext.data.roberta_tensorizer import (
    RoBERTaTensorizer,
    RoBERTaTensorizerScriptImpl,
)
from pytext.data.sources import SquadDataSource
from pytext.data.sources.data_source import Gazetteer, SafeFileWrapper, load_float_list
from pytext.data.sources.pandas import SessionPandasDataSource
from pytext.data.sources.tsv import SessionTSVDataSource, TSVDataSource
from pytext.data.squad_for_bert_tensorizer import (
    SquadForBERTTensorizer,
    SquadForRoBERTaTensorizer,
)
from pytext.data.squad_tensorizer import SquadTensorizer
from pytext.data.tensorizers import (
    AnnotationNumberizer,
    ByteTensorizer,
    ByteTokenTensorizer,
    CharacterVocabTokenTensorizer,
    Float1DListTensorizer,
    FloatListSeqTensorizer,
    FloatListTensorizer,
    GazetteerTensorizer,
    Integer1DListTensorizer,
    LabelListTensorizer,
    LabelTensorizer,
    SeqTokenTensorizer,
    String2DListTensorizer,
    TokenTensorizer,
    VocabConfig,
    VocabFileConfig,
    initialize_tensorizers,
    lookup_tokens,
)
from pytext.data.token_tensorizer import ScriptBasedTokenTensorizer
from pytext.data.tokenizers import (
    DoNothingTokenizer,
    GPT2BPETokenizer,
    Tokenizer,
    WordPieceTokenizer,
)
from pytext.data.utils import Vocabulary
from pytext.torchscript.utils import ScriptBatchInput
from pytext.utils import precision
from pytext.utils.test import import_tests_module


tests_module = import_tests_module()


[docs]class LookupTokensTest(unittest.TestCase):
[docs] def test_lookup_tokens(self): text = "let's tokenize this" tokenizer = Tokenizer() vocab = Vocabulary(text.split() + [SpecialTokens.BOS, SpecialTokens.EOS]) tokens, start_idx, end_idx = lookup_tokens( text, tokenizer=tokenizer, vocab=vocab, bos_token=None, eos_token=None ) self.assertEqual(tokens, [0, 1, 2]) self.assertEqual(start_idx, (0, 6, 15)) self.assertEqual(end_idx, (5, 14, 19)) tokens, start_idx, end_idx = lookup_tokens( text, tokenizer=tokenizer, vocab=vocab, bos_token=SpecialTokens.BOS, eos_token=SpecialTokens.EOS, ) self.assertEqual(tokens, [3, 0, 1, 2, 4]) self.assertEqual(start_idx, (-1, 0, 6, 15, -1)) self.assertEqual(end_idx, (-1, 5, 14, 19, -1))
[docs]class ListTensorizersTest(unittest.TestCase):
[docs] def setUp(self): self.data = SessionTSVDataSource( SafeFileWrapper(tests_module.test_file("seq_tagging_example.tsv")), field_names=["session_id", "intent", "goal", "label"], schema={"intent": List[str], "goal": List[str], "label": List[str]}, )
[docs] def test_initialize_list_tensorizers(self): tensorizers = { "intent": LabelListTensorizer( label_column="intent", pad_in_vocab=True, allow_unknown=True ), "goal": LabelListTensorizer(label_column="goal"), } initialize_tensorizers(tensorizers, self.data.train) self.assertEqual(9, len(tensorizers["intent"].vocab)) self.assertEqual(7, len(tensorizers["goal"].vocab))
[docs] def test_create_label_list_tensors(self): tensorizers = { "intent": LabelListTensorizer( label_column="intent", pad_in_vocab=True, allow_unknown=True ) } initialize_tensorizers(tensorizers, self.data.train) tensors = [tensorizers["intent"].numberize(row) for row in self.data.train] # test label idx self.assertEqual([2, 3], tensors[0][0]) self.assertEqual([4, 5], tensors[1][0]) self.assertEqual([6, 7, 8], tensors[2][0]) # test seq lens self.assertEqual(2, tensors[0][1]) self.assertEqual(2, tensors[1][1]) self.assertEqual(3, tensors[2][1]) self.assertEqual(3, len(tensors)) tensors, lens = tensorizers["intent"].tensorize(tensors) np.testing.assert_array_almost_equal( np.array([[2, 3, 1], [4, 5, 1], [6, 7, 8]]), tensors.detach().numpy() ) np.testing.assert_array_almost_equal(np.array([2, 2, 3]), lens.detach().numpy())
[docs] def test_label_list_tensors_no_pad_in_vocab(self): tensorizers = { "intent": LabelListTensorizer( label_column="intent", pad_in_vocab=False, allow_unknown=True ) } initialize_tensorizers(tensorizers, self.data.train) self.assertEqual(8, len(tensorizers["intent"].vocab)) tensors = [] for row in self.data.train: row["intent"].append("unknown") tensors.append(tensorizers["intent"].numberize(row)) tensors, lens = tensorizers["intent"].tensorize(tensors) np.testing.assert_array_almost_equal( np.array([[1, 2, 0, -1], [3, 4, 0, -1], [5, 6, 7, 0]]), tensors.detach().numpy(), )
[docs] def test_label_list_tensors_pad_missing(self): ds = SessionPandasDataSource( test_df=pd.DataFrame( # test None and empty case { "session_id": [1, 1, 1, 1], "label": ["positive", "negative", None, ""], } ), schema={"label": List[str]}, id_col="session_id", ) tensorizers = { "label": LabelListTensorizer( pad_missing=True, label_column="label", pad_in_vocab=False, allow_unknown=False, ) } initialize_tensorizers(tensorizers, ds.test) self.assertEqual(2, len(tensorizers["label"].vocab)) # only one row in test data label_idx_list, lens = tensorizers["label"].numberize(next(iter(ds.test))) self.assertEqual([0, 1, -1, -1], label_idx_list) tensorizers["label"].pad_missing = False with self.assertRaises(Exception): tensorizers["label"].numberize(next(iter(ds.test)))
# fmt: off EXPECTED_ACTIONS = [ [0, 1, 1, 2, 1, 3, 3], [4, 1, 5, 1, 1, 1, 3, 1, 6, 7, 8, 1, 3, 1, 3, 3, 1, 1, 9, 1, 1, 1, 1, 1, 3, 1, 3], [10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3], [0, 1, 1, 1, 1, 1, 3], [11, 1, 1, 1, 1, 1, 1, 1, 1, 1, 6, 7, 1, 3, 3, 1, 5, 12, 1, 3, 3, 1, 1, 1, 9, 1, 1, 1, 3, 1, 3], [4, 1, 1, 1, 1, 5, 1, 3, 1, 1, 13, 1, 1, 1, 3, 3], [4, 1, 1, 1, 1, 1, 5, 7, 1, 3, 3, 3], [14, 1, 1, 1, 6, 1, 3, 1, 5, 1, 3, 3], [0, 1, 1, 1, 1, 1, 2, 1, 3, 3], [10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3], ] # fmt: on
[docs]class TensorizersTest(unittest.TestCase):
[docs] def setUp(self): self.data = TSVDataSource( SafeFileWrapper(tests_module.test_file("train_dense_features_tiny.tsv")), SafeFileWrapper(tests_module.test_file("test_dense_features_tiny.tsv")), eval_file=None, field_names=["label", "slots", "text", "dense"], schema={"text": str, "label": str}, )
def _initialize_tensorizer(self, tensorizer, data=None): if data is None: data = self.data init = tensorizer.initialize() init.send(None) # kick for row in data.train: init.send(row) init.close()
[docs] def test_initialize_tensorizers(self): tensorizers = { "tokens": TokenTensorizer(text_column="text"), "labels": LabelTensorizer(label_column="label"), "chars": ByteTensorizer(text_column="text"), } initialize_tensorizers(tensorizers, self.data.train) self.assertEqual(49, len(tensorizers["tokens"].vocab)) self.assertEqual(7, len(tensorizers["labels"].vocab))
[docs] def test_initialize_token_tensorizer(self): # default (build from data) tensorizer = TokenTensorizer(text_column="text") self._initialize_tensorizer(tensorizer) self.assertEqual(49, len(tensorizer.vocab)) # size limit on tokens from data tensorizer = TokenTensorizer( text_column="text", vocab_config=VocabConfig(size_from_data=3) ) self._initialize_tensorizer(tensorizer) self.assertEqual(5, len(tensorizer.vocab)) # 3 + unk token + pad token embed_file = tests_module.test_file("pretrained_embed_raw") # vocab from data + vocab_file tensorizer = TokenTensorizer( text_column="text", vocab_config=VocabConfig( size_from_data=3, vocab_files=[ VocabFileConfig(filepath=embed_file, skip_header_line=True) ], ), ) self._initialize_tensorizer(tensorizer) self.assertEqual(15, len(tensorizer.vocab)) # vocab just from vocab_file tensorizer = TokenTensorizer( text_column="text", vocab_config=VocabConfig( build_from_data=False, vocab_files=[ VocabFileConfig( filepath=embed_file, skip_header_line=True, size_limit=5 ) ], ), ) init = tensorizer.initialize() # Should skip initialization with self.assertRaises(StopIteration): init.send(None) self.assertEqual(7, len(tensorizer.vocab)) # 5 + unk token + pad token
[docs] def test_numberize_with_token_tensorizer(self): tensorizer = TokenTensorizer(text_column="text") self._initialize_tensorizer(tensorizer) rows = [{"text": "I want some coffee"}, {"text": "Turn it up"}] tensors = (tensorizer.numberize(row) for row in rows) tokens, seq_len, token_ranges = next(tensors) self.assertEqual([24, 0, 0, 0], tokens) self.assertEqual(4, seq_len) self.assertEqual([(0, 1), (2, 6), (7, 11), (12, 18)], token_ranges) tokens, seq_len, token_ranges = next(tensors) self.assertEqual([13, 47, 9], tokens) self.assertEqual(3, seq_len) self.assertEqual([(0, 4), (5, 7), (8, 10)], token_ranges)
[docs] def test_numberize_with_script_token_tensorizer(self): rows = [{"text": "I want some coffee"}, {"text": "Turn it up"}] expected_outputs = [ [ ("tokens", [24, 0, 0, 0]), ("seq_len", 4), ("token_ranges", [(0, 1), (2, 6), (7, 11), (12, 18)]), ], [ ("tokens", [13, 47, 9]), ("seq_len", 3), ("token_ranges", [(0, 4), (5, 7), (8, 10)]), ], ] tensorizer = ScriptBasedTokenTensorizer(text_column="text") self._initialize_tensorizer(tensorizer) tensors = (tensorizer.numberize(row) for row in rows) for expected, numberized in zip(expected_outputs, tensors): for (field, value), out in zip(expected, numberized): self.assertEqual(value, out, msg=f"{field} didn't match!") # Test that EOS / BOS tokens are correctly added tensorizer = ScriptBasedTokenTensorizer( text_column="text", add_eos_token=True, add_bos_token=True ) self._initialize_tensorizer(tensorizer) tensors = (tensorizer.numberize(row) for row in rows) bos = tensorizer.vocab.get_bos_index() eos = tensorizer.vocab.get_eos_index() unk = tensorizer.vocab.get_unk_index() for expected, (tokens, seq_len, token_ranges) in zip(expected_outputs, tensors): # Token indices are shifted due to the two new tokens, hence x + 2 self.assertEqual( [bos] + [x + 2 if x != unk else x for x in expected[0][1]] + [eos], tokens, ) # We have one extra token at the beginning and one at the end self.assertEqual(expected[1][1] + 2, seq_len) # EOS / BOS tokens don't have ranges self.assertEqual([(-1, -1)] + expected[2][1] + [(-1, -1)], token_ranges)
[docs] def test_tensorize_with_script_token_tensorizer(self): rows = [{"text": "I want some coffee"}, {"text": "Turn it up"}] tensorizer = ScriptBasedTokenTensorizer( text_column="text", add_eos_token=True, add_bos_token=True ) self._initialize_tensorizer(tensorizer) def check_results(results): tokens, seq_lens, token_ranges = results self.assertIsInstance(tokens, torch.LongTensor) self.assertIsInstance(seq_lens, torch.LongTensor) self.assertIsInstance(token_ranges, torch.LongTensor) self.assertEqual((2, 6), tokens.size()) self.assertEqual((2,), seq_lens.size()) self.assertEqual((2, 6, 2), token_ranges.size()) self.assertEqual( [[2, 26, 0, 0, 0, 3], [2, 15, 49, 11, 3, 1]], tokens.tolist() ) self.assertEqual([6, 5], seq_lens.tolist()) self.assertEqual( [ [[-1, -1], [0, 1], [2, 6], [7, 11], [12, 18], [-1, -1]], [[-1, -1], [0, 4], [5, 7], [8, 10], [-1, -1], [-1, -1]], ], token_ranges.tolist(), ) original_results = tensorizer.tensorize( [tensorizer.numberize(row) for row in rows] ) check_results(original_results) # Make sure all is well after exporting to TorchScript torchscript_tensorizer = tensorizer.torchscriptify() # The tokenizer we use by default is not torchscriptifiable so we do # tokenization beforehand and add positions ourselves. tokenized_rows = [ torchscript_tensorizer.tokenize( row_text=None, row_pre_tokenized=row["text"].lower().split(" ") ) for row in rows ] positions_2d = [ [(-1, -1), (0, 1), (2, 6), (7, 11), (12, 18), (-1, -1)], [(-1, -1), (0, 4), (5, 7), (8, 10), (-1, -1)], ] tokens_2d = [] seq_lens_1d = [] for row in tokenized_rows: tokens, seq_len, postion = torchscript_tensorizer.numberize(row) tokens_2d.append(tokens) seq_lens_1d.append(seq_len) torchscript_results = torchscript_tensorizer.tensorize( tokens_2d, seq_lens_1d, positions_2d ) check_results(torchscript_results)
[docs] def test_create_byte_tensors(self): tensorizer = ByteTensorizer(text_column="text", lower=False) # not initializing because initializing is a no-op for ByteTensorizer s1 = "I want some coffee" s2 = "Turn it up" s3 = "我不会说中文" rows = [{"text": s1}, {"text": s2}, {"text": s3}] expected = [list(s1.encode()), list(s2.encode()), list(s3.encode())] tensors = [tensorizer.numberize(row) for row in rows] self.assertEqual([(bytes, len(bytes)) for bytes in expected], tensors)
[docs] def test_byte_tensors_error_code(self): tensorizer = ByteTensorizer( text_column="text", lower=False, add_bos_token=True, add_eos_token=True ) s1 = "I want some coffee#" s2 = "This is ^the best show I've ever seen" rows = [{"text": s1}, {"text": s2}] expected_error_code = 1 with self.assertRaises(SystemExit) as cm: for row in rows: tensorizer.numberize(row) self.assertEqual(cm.exception.code, expected_error_code)
[docs] def test_create_byte_token_tensors(self): tensorizer = ByteTokenTensorizer( text_column="text", max_seq_len=4, max_byte_len=5 ) # not initializing because initializing is a no-op for this tensorizer s1 = "I want some coffee today" s2 = "Turn it up" def ords(word, pad_to): return list(word.encode()) + [0] * (pad_to - len(word)) batch = [{"text": s1}, {"text": s2}] # Note that the tokenizer lowercases here expected = [ [ords("i", 5), ords("want", 5), ords("some", 5), ords("coffe", 5)], [ords("turn", 5), ords("it", 5), ords("up", 5), ords("", 5)], ] expected_token_lens = [4, 3] expected_byte_lens = [[1, 4, 4, 5], [4, 2, 2, 0]] bytes, token_lens, byte_lens = tensorizer.tensorize( [tensorizer.numberize(row) for row in batch] ) self.assertIsInstance(bytes, torch.LongTensor) self.assertIsInstance(token_lens, torch.LongTensor) self.assertIsInstance(byte_lens, torch.LongTensor) self.assertEqual((2, 4, 5), bytes.size()) self.assertEqual((2,), token_lens.size()) self.assertEqual((2, 4), byte_lens.size()) self.assertEqual(expected, bytes.tolist()) self.assertEqual(expected_token_lens, token_lens.tolist()) self.assertEqual(expected_byte_lens, byte_lens.tolist())
[docs] def test_initialize_label_tensorizer(self): tensorizer = LabelTensorizer(label_column="label") self._initialize_tensorizer(tensorizer) self.assertEqual(7, len(tensorizer.vocab))
[docs] def test_create_label_tensors(self): tensorizer = LabelTensorizer(label_column="label") self._initialize_tensorizer(tensorizer) rows = [ {"label": "weather/find"}, {"label": "alarm/set_alarm"}, {"label": "non/existent"}, ] tensors = (tensorizer.numberize(row) for row in rows) tensor = next(tensors) self.assertEqual(6, tensor) tensor = next(tensors) self.assertEqual(1, tensor) with self.assertRaises(Exception): tensor = next(tensors)
[docs] def test_create_label_tensors_add_labels(self): add_labels = ["add_label_1", "add_label_2"] tensorizer = LabelTensorizer.from_config( LabelTensorizer.Config(add_labels=add_labels) ) self._initialize_tensorizer(tensorizer) self.assertEqual(7 + len(add_labels), len(tensorizer.vocab))
[docs] def test_create_label_tensors_label_vocab(self): add_labels = ["add_label_1", "add_label_2"] label_vocab = ["label_1", "label_2"] tensorizer = LabelTensorizer.from_config( LabelTensorizer.Config(label_vocab=label_vocab, add_labels=add_labels) ) # skip initialization when using label_vocab. with self.assertRaises(StopIteration): self._initialize_tensorizer(tensorizer) # add_labels are ignored if using label_vocab. self.assertEqual(len(label_vocab), len(tensorizer.vocab))
[docs] def test_gazetteer_tensor_bad_json(self): tensorizer = GazetteerTensorizer() data = TSVDataSource( train_file=SafeFileWrapper( tests_module.test_file("train_dict_features_bad_json.tsv") ), test_file=None, eval_file=None, field_names=["text", "dict"], schema={"text": str, "dict": Gazetteer}, ) init = tensorizer.initialize() init.send(None) # kick with self.assertRaises(Exception): for row in data.train: init.send(row) init.close()
[docs] def test_gazetteer_tensor(self): tensorizer = GazetteerTensorizer() data = TSVDataSource( train_file=SafeFileWrapper( tests_module.test_file("train_dict_features.tsv") ), test_file=None, eval_file=None, field_names=["text", "dict"], schema={"text": str, "dict": Gazetteer}, ) self._initialize_tensorizer(tensorizer, data) # UNK + PAD + 5 labels self.assertEqual(7, len(tensorizer.vocab)) # only two rows in test file: # "Order coffee from Starbucks please" # "Order some fries from McDonalds please" for i, row in enumerate(data.train): if i == 0: idx, weights, lens = tensorizer.numberize(row) self.assertEqual([1, 1, 2, 3, 1, 1, 4, 1, 1, 1], idx) self.assertEqual( [0.0, 0.0, 0.8, 0.2, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], weights ) self.assertEqual([1, 2, 1, 1, 1], lens) if i == 1: idx, weights, lens = tensorizer.numberize(row) self.assertEqual([1, 1, 5, 1, 6, 1], idx) self.assertEqual([0.0, 0.0, 1.0, 0.0, 1.0, 0.0], weights) self.assertEqual([1, 1, 1, 1, 1, 1], lens) feats, weights, lens = tensorizer.tensorize( tensorizer.numberize(row) for row in data.train ) self.assertEqual( [ [1, 1, 2, 3, 1, 1, 4, 1, 1, 1, 1, 1], [1, 1, 1, 1, 5, 1, 1, 1, 6, 1, 1, 1], ], feats.numpy().tolist(), ) self.assertEqual( str( [ [0.0, 0.0, 0.8, 0.2, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], ] ), str( [[round(w, 2) for w in utt_weights] for utt_weights in weights.numpy()] ), ) self.assertEqual( [[1, 2, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1]], lens.numpy().tolist() )
[docs] def test_seq_tensor(self): tensorizer = SeqTokenTensorizer() data = TSVDataSource( train_file=SafeFileWrapper( tests_module.test_file("train_seq_features.tsv") ), test_file=None, eval_file=None, field_names=["text_seq"], schema={"text_seq": List[str]}, ) self._initialize_tensorizer(tensorizer, data) # UNK + PAD + 6 tokens self.assertEqual(8, len(tensorizer.vocab)) # only one row in test file: # ["where do you wanna meet?", "MPK"] for row in data.train: tokens, token_lens, seq_lens = tensorizer.prepare_input(row) idx, sentence_lens, lens = tensorizer.numberize(row) self.assertEqual(2, lens) self.assertEqual([[2, 3, 4, 5, 6], [7, 1, 1, 1, 1]], idx) self.assertEqual([5, 1], sentence_lens) self.assertEqual(2, seq_lens) self.assertEqual( [ ["where", "do", "you", "wanna", "meet?"], [ "mpk", str(SpecialTokens.PAD), str(SpecialTokens.PAD), str(SpecialTokens.PAD), str(SpecialTokens.PAD), ], ], tokens, )
[docs] def test_seq_tensor_max_turn(self): tensorizer = SeqTokenTensorizer(max_turn=1) data = TSVDataSource( train_file=SafeFileWrapper( tests_module.test_file("train_seq_features.tsv") ), test_file=None, eval_file=None, field_names=["text_seq"], schema={"text_seq": List[str]}, ) self._initialize_tensorizer(tensorizer, data) # only one row in test file: # ["where do you wanna meet?", "MPK"] for row in data.train: idx, sentence_lens, seq_len = tensorizer.numberize(row) self.assertEqual(1, seq_len) self.assertEqual([[2, 3, 4, 5, 6]], idx) self.assertEqual([5], sentence_lens)
[docs] def test_seq_tensor_pad_batch(self): tensorizer = SeqTokenTensorizer() data = TSVDataSource( train_file=SafeFileWrapper( tests_module.test_file("train_seq_features.tsv") ), test_file=None, eval_file=None, field_names=["text_seq"], schema={"text_seq": List[str]}, ) self._initialize_tensorizer(tensorizer, data) token_idx_1 = [[2, 3], [2, 1]] token_count_1 = [2, 1] seq_len_1 = 2 token_idx_2 = [[2, 3, 4]] token_count_2 = [3] seq_len_2 = 1 token_idx_tensor, token_count_tensor, seq_len_tensor = tensorizer.tensorize( [ (token_idx_1, token_count_1, seq_len_1), (token_idx_2, token_count_2, seq_len_2), ] ) np.testing.assert_array_almost_equal( np.array([[[2, 3, 1], [2, 1, 1]], [[2, 3, 4], [1, 1, 1]]]), token_idx_tensor.detach().numpy(), ) np.testing.assert_array_almost_equal( np.array([[2, 1], [3, 1]]), token_count_tensor.detach().numpy() ) np.testing.assert_array_almost_equal( np.array([2, 1]), seq_len_tensor.detach().numpy() )
[docs] def test_seq_tensor_with_bos_eos_eol_bol(self): tensorizer = SeqTokenTensorizer( add_bos_token=True, add_eos_token=True, add_bol_token=True, add_eol_token=True, ) data = TSVDataSource( train_file=SafeFileWrapper( tests_module.test_file("train_seq_features.tsv") ), test_file=None, eval_file=None, field_names=["text_seq"], schema={"text_seq": List[str]}, ) self._initialize_tensorizer(tensorizer, data) # UNK + PAD + BOS + EOS + BOL + EOL + 6 tokens self.assertEqual(12, len(tensorizer.vocab)) # only one row in test file: # ["where do you wanna meet?", "MPK"] for row in data.train: idx, sen_lens, lens = tensorizer.numberize(row) tokens, token_lens, seq_lens = tensorizer.prepare_input(row) self.assertEqual(4, lens) self.assertEqual(4, seq_lens) self.assertEqual([3, 7, 3, 3], token_lens) self.assertEqual( [ [2, 4, 3, 1, 1, 1, 1], [2, 6, 7, 8, 9, 10, 3], [2, 11, 3, 1, 1, 1, 1], [2, 5, 3, 1, 1, 1, 1], ], idx, ) self.assertEqual( [ [ str(SpecialTokens.BOS), str(SpecialTokens.BOL), str(SpecialTokens.EOS), str(SpecialTokens.PAD), str(SpecialTokens.PAD), str(SpecialTokens.PAD), str(SpecialTokens.PAD), ], [ str(SpecialTokens.BOS), "where", "do", "you", "wanna", "meet?", str(SpecialTokens.EOS), ], [ str(SpecialTokens.BOS), "mpk", str(SpecialTokens.EOS), str(SpecialTokens.PAD), str(SpecialTokens.PAD), str(SpecialTokens.PAD), str(SpecialTokens.PAD), ], [ str(SpecialTokens.BOS), str(SpecialTokens.EOL), str(SpecialTokens.EOS), str(SpecialTokens.PAD), str(SpecialTokens.PAD), str(SpecialTokens.PAD), str(SpecialTokens.PAD), ], ], tokens, )
[docs] def test_create_float_list_tensor(self): tensorizer = FloatListTensorizer( column="dense", dim=2, error_check=True, normalize=False ) tests = [ ("[0.1,0.2]", [0.1, 0.2]), # comma ("[0.1, 0.2]", [0.1, 0.2]), # comma with single space ("[0.1, 0.2]", [0.1, 0.2]), # comma with multiple spaces ("[0.1 0.2]", [0.1, 0.2]), # space ("[0.1 0.2]", [0.1, 0.2]), # multiple spaces ("[ 0.1 0.2]", [0.1, 0.2]), # space after [ ("[0.1 0.2 ]", [0.1, 0.2]), # space before ] ("[0. 1.]", [0.0, 1.0]), # 0., 1. ] for raw, expected in tests: row = {"dense": load_float_list(raw)} numberized = tensorizer.numberize(row) self.assertEqual(expected, numberized) precision.FP16_ENABLED = True batch = [] for raw, _ in tests: row = {"dense": load_float_list(raw)} batch.append(tensorizer.numberize(row)) tensor = tensorizer.tensorize(batch) self.assertEqual(list(tensor.size()), [8, 2]) self.assertEqual(tensor.dtype, torch.float16) precision.FP16_ENABLED = False # Test the case when dim is None tensorizer = FloatListTensorizer( column="dense", dim=None, error_check=False, normalize=False ) tests = [ ("[0.1,0.2]", [0.1, 0.2]), # two elements ("[0.1, 0.2, 0.3]", [0.1, 0.2, 0.3]), # three elements ] for raw, expected in tests: row = {"dense": load_float_list(raw)} numberized = tensorizer.numberize(row) self.assertEqual(expected, numberized)
[docs] def test_float_list_tensor_prepare_input(self): tensorizer = FloatListTensorizer( column="dense", dim=2, error_check=True, normalize=False ) tests = [("[0.1,0.2]", [0.1, 0.2])] for raw, expected in tests: row = {"dense": load_float_list(raw)} numberized = tensorizer.prepare_input(row) self.assertEqual(expected, numberized)
[docs] def test_create_normalized_float_list_tensor(self): def round_list(seq): return [float("%.4f" % n) for n in seq] data = TSVDataSource( SafeFileWrapper(tests_module.test_file("train_dense_features_tiny.tsv")), eval_file=None, field_names=["label", "slots", "text", "dense_feat"], schema={"text": str, "label": str, "dense_feat": List[float]}, ) tensorizer = FloatListTensorizer( column="dense_feat", dim=10, error_check=True, normalize=True ) self._initialize_tensorizer(tensorizer, data) self.assertEqual(10, tensorizer.normalizer.num_rows) self.assertEqual( round_list( [ 7.56409, 8.2388, 0.5531, 0.2403, 1.03130, 6.2888, 3.1595, 0.1538, 0.2403, 5.3463, ] ), round_list(tensorizer.normalizer.feature_sums), ) self.assertEqual( round_list( [ 5.80172, 7.57586, 0.30591, 0.05774, 0.52762, 5.22811, 2.51727, 0.02365, 0.05774, 4.48798, ] ), round_list(tensorizer.normalizer.feature_squared_sums), ) self.assertEqual( round_list( [ 0.75640, 0.82388, 0.05531, 0.02403, 0.10313, 0.62888, 0.31595, 0.01538, 0.02403, 0.53463, ] ), round_list(tensorizer.normalizer.feature_avgs), ) self.assertEqual( round_list( [ 0.08953, 0.28072, 0.16593, 0.07209, 0.20524, 0.35682, 0.38974, 0.04614, 0.07209, 0.40369, ] ), round_list(tensorizer.normalizer.feature_stddevs), ) row = [0.64840776, 0.7575, 0.5531, 0.2403, 0, 0.9481, 0, 0.1538, 0.2403, 0.3564] output = tensorizer.numberize({"dense_feat": row}) self.assertEqual( round_list( [ -1.20619, -0.23646, 2.99999, 3.0, -0.50246, 0.89462, -0.81066, 2.99999, 3.0, -0.44149, ] ), round_list(output), )
[docs] def test_create_float_list_seq_tensor(self): tensorizer = FloatListSeqTensorizer(column="dense", dim=2, error_check=True) tests = [ ( ["[0.1,0.2]", "[0.1, 0.2]", "[0.1, 0.2]", "[0.1 0.2]"], [[0.1, 0.2], [0.1, 0.2], [0.1, 0.2], [0.1, 0.2]], 4, ), ( ["[0.1 0.2]", "[ 0.1 0.2]", "[0.1 0.2 ]", "[ 0.1 0.2 ]", "[0. 1.]"], [[0.1, 0.2], [0.1, 0.2], [0.1, 0.2], [0.1, 0.2], [0.0, 1.0]], 5, ), ] for raw_list, expected, expected_length in tests: row = {"dense": [load_float_list(raw) for raw in raw_list]} numberized, numberized_len = tensorizer.numberize(row) self.assertEqual(expected, numberized) self.assertEqual(expected_length, numberized_len) batch = [] for raw_list, _, _ in tests: row = {"dense": [load_float_list(raw) for raw in raw_list]} tensor, tensor_len = tensorizer.numberize(row) batch.append((tensor, tensor_len)) tensor, tensor_lens = tensorizer.tensorize(batch) self.assertEqual(list(tensor.size()), [2, 5, 2]) self.assertEqual(tensor.dtype, torch.float) self.assertEqual(tensor_lens.tolist(), [4, 5])
[docs] def test_float_list_seq_tensor_prepare_input(self): tensorizer = FloatListSeqTensorizer(column="dense", dim=2, error_check=True) tests = [ ( ["[0.1,0.2]", "[0.1, 0.2]", "[0.1, 0.2]", "[0.1 0.2]"], [[0.1, 0.2], [0.1, 0.2], [0.1, 0.2], [0.1, 0.2]], 4, ) ] for raw_list, expected, expect_length in tests: row = {"dense": [load_float_list(raw) for raw in raw_list]} numberized, numberized_len = tensorizer.prepare_input(row) self.assertEqual(expected, numberized) self.assertEqual(expect_length, numberized_len)
[docs] def test_annotation_num(self): data = TSVDataSource( SafeFileWrapper(tests_module.test_file("compositional_seq2seq_unit.tsv")), test_file=None, eval_file=None, field_names=["text", "seqlogical"], schema={"text": str, "seqlogical": str}, ) nbrz = AnnotationNumberizer() self._initialize_tensorizer(nbrz, data) # vocab = {'IN:GET_INFO_TRAFFIC': 0, 'SHIFT': 1, 'SL:LOCATION': 2, # 'REDUCE': 3, 'IN:GET_DIRECTIONS': 4, 'SL:DESTINATION': 5, 'SL:SOURCE': 6, # 'IN:GET_LOCATION_HOME': 7, 'SL:CONTACT': 8, 'SL:DATE_TIME_DEPARTURE': 9, # 'IN:UNSUPPORTED_NAVIGATION': 10, 'IN:GET_ESTIMATED_DURATION': 11, # 'IN:GET_LOCATION_WORK': 12, 'SL:PATH_AVOID': 13, 'IN:GET_DISTANCE': 14} self.assertEqual(15, len(nbrz.vocab)) self.assertEqual(1, nbrz.shift_idx) self.assertEqual(3, nbrz.reduce_idx) self.assertEqual([10], nbrz.ignore_subNTs_roots) self.assertEqual( [0, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], nbrz.valid_NT_idxs ) self.assertEqual([0, 4, 7, 10, 11, 12, 14], nbrz.valid_IN_idxs) self.assertEqual([2, 5, 6, 8, 9, 13], nbrz.valid_SL_idxs) for row, expected in zip(data.train, EXPECTED_ACTIONS): actions = nbrz.numberize(row) self.assertEqual(expected, actions)
[docs] def test_integer_1D_list_tensorizer(self): rows = [ {"1d_list": [1, 2, 3]}, {"1d_list": [10, 20]}, ] expected_output = [ [1, 2, 3], [10, 20, 0], ] tensorizer = Integer1DListTensorizer.from_config( Integer1DListTensorizer.Config(column="1d_list") ) self._initialize_tensorizer(tensorizer) numberized_rows = (tensorizer.numberize(row) for row in rows) for row, numberized in zip(rows, numberized_rows): self.assertEqual(row["1d_list"], numberized) tensorized_rows = tensorizer.tensorize( [tensorizer.numberize(row) for row in rows] ) assert tensorized_rows.equal(torch.tensor(expected_output, dtype=torch.long)) # Check the tensorizer after exporting to TorchScript torchscript_tensorizer = tensorizer.torchscriptify() torchscript_numberized = [] torchscript_lens = [] for row in rows: numberized, length = torchscript_tensorizer.numberize(row["1d_list"]) self.assertEqual(numberized, row["1d_list"]) self.assertEqual(length, len(row["1d_list"])) torchscript_numberized.append(numberized) torchscript_lens.append(length) torchscript_tensorized = torchscript_tensorizer.tensorize( torchscript_numberized, torchscript_lens ) assert torchscript_tensorized.equal( torch.tensor(expected_output, dtype=torch.long) )
[docs] def test_float_1D_list_tensorizer(self): rows = [ {"1d_list": [1.5, 2.5, 3.5]}, {"1d_list": [10.2, 20.2]}, ] expected_output = [ [1.5, 2.5, 3.5], [10.2, 20.2, 1.0], ] tensorizer = Float1DListTensorizer.from_config( Float1DListTensorizer.Config(column="1d_list") ) self._initialize_tensorizer(tensorizer) numberized_rows = (tensorizer.numberize(row) for row in rows) for row, numberized in zip(rows, numberized_rows): self.assertEqual(row["1d_list"], numberized) tensorized_rows = tensorizer.tensorize( [tensorizer.numberize(row) for row in rows] ) assert tensorized_rows.equal(torch.tensor(expected_output, dtype=torch.float)) # Check the tensorizer after exporting to TorchScript torchscript_tensorizer = tensorizer.torchscriptify() torchscript_numberized = [] torchscript_lens = [] for row in rows: numberized, length = torchscript_tensorizer.numberize(row["1d_list"]) self.assertEqual(numberized, row["1d_list"]) self.assertEqual(length, len(row["1d_list"])) torchscript_numberized.append(numberized) torchscript_lens.append(length) torchscript_tensorized = torchscript_tensorizer.tensorize( torchscript_numberized, torchscript_lens ) assert torchscript_tensorized.equal( torch.tensor(expected_output, dtype=torch.float) )
[docs] def test_float_list_seq_torchscriptify(self): rows = [ {"2d_list": [[1.5, 2.5, 3.5], [1.2, 2.2]]}, {"2d_list": [[10.2, 20.2], [10.1], [20.1]]}, ] expected_lists = [ [[1.5, 2.5, 3.5], [1.2, 2.2, 1.0], [1.0, 1.0, 1.0]], [ [10.2, 20.2, 1.0], [10.1, 1.0, 1.0], [20.1, 1.0, 1.0], ], ] expected_lens = [2, 3] tensorizer = FloatListSeqTensorizer.from_config( FloatListSeqTensorizer.Config(column="2d_list", pad_token=1.0) ) numberized_rows = (tensorizer.numberize(row) for row in rows) for row, numberized in zip(rows, numberized_rows): self.assertEqual(row["2d_list"], numberized[0]) self.assertEqual(len(row["2d_list"]), numberized[1]) tensorized_lists, tensorized_lens = tensorizer.tensorize( [tensorizer.numberize(row) for row in rows] ) assert tensorized_lists.equal(torch.tensor(expected_lists, dtype=torch.float)) assert tensorized_lens.equal(torch.tensor(expected_lens, dtype=torch.long)) # Check the tensorizer after exporting to TorchScript torchscript_tensorizer = tensorizer.torchscriptify() torchscript_numberized = [] torchscript_lens = [] for row in rows: numberized, length = torchscript_tensorizer.numberize(row["2d_list"]) self.assertEqual(numberized, row["2d_list"]) self.assertEqual(length, len(row["2d_list"])) torchscript_numberized.append(numberized) torchscript_lens.append(length) ( torchscript_tensorized_lists, torchscript_tensorized_lens, ) = torchscript_tensorizer.tensorize(torchscript_numberized, torchscript_lens) assert torchscript_tensorized_lists.equal( torch.tensor(expected_lists, dtype=torch.float) ) assert torchscript_tensorized_lens.equal( torch.tensor(expected_lens, dtype=torch.long) )
[docs]class BERTTensorizerTest(unittest.TestCase):
[docs] def test_bert_tensorizer(self): sentence = "<SOS> Focus Driving School Mulungushi bus station along Kasuba road, wamkopeka building. Ndola, Zambia." # expected result was obtained offline by running BertModelDataHandler expected = [ 101, 133, 278, 217, 135, 175, 287, 766, 462, 100, 379, 182, 459, 334, 459, 280, 504, 462, 425, 283, 171, 462, 567, 474, 180, 262, 217, 459, 931, 262, 913, 117, 192, 262, 407, 478, 287, 744, 263, 478, 262, 560, 119, 183, 282, 287, 843, 117, 195, 262, 407, 931, 566, 119, 102, ] row = {"text": sentence} tensorizer = BERTTensorizer.from_config( BERTTensorizer.Config( tokenizer=WordPieceTokenizer.Config( wordpiece_vocab_path="pytext/data/test/data/wordpiece_1k.txt" ) ) ) tensorizer_impl = BERTTensorizerScriptImpl( tokenizer=DoNothingTokenizer(), vocab=tensorizer.vocab, max_seq_len=tensorizer.max_seq_len, ).torchscriptify() tokens, segment_label, seq_len, positions = tensorizer.numberize(row) self.assertEqual(tokens, expected) self.assertEqual(seq_len, len(expected)) self.assertEqual(segment_label, [0] * len(expected)) tokens, pad_mask, segment_labels, _ = tensorizer.tensorize( [(tokens, segment_label, seq_len, positions)] ) self.assertEqual(pad_mask[0].tolist(), [1] * len(expected)) per_sentence_tokens = [tensorizer.tokenizer.tokenize(sentence)] tokens, segment_label, seq_len, positions = tensorizer_impl.numberize( per_sentence_tokens ) self.assertEqual(tokens, expected) self.assertEqual(seq_len, len(expected)) self.assertEqual(segment_label, [0] * len(expected)) tokens, pad_mask, segment_labels, _ = tensorizer_impl.tensorize( [tokens], [segment_label], [seq_len], [positions] ) self.assertEqual(pad_mask[0].tolist(), [1] * len(expected))
[docs] def test_bert_pair_tensorizer(self): sentences = ["Focus", "Driving School"] expected_tokens = [101, 175, 287, 766, 462, 102, 100, 379, 102] expected_segment_labels = [0, 0, 0, 0, 0, 0, 1, 1, 1] row = {"text1": sentences[0], "text2": sentences[1]} tensorizer = BERTTensorizer.from_config( BERTTensorizer.Config( columns=["text1", "text2"], tokenizer=WordPieceTokenizer.Config( wordpiece_vocab_path="pytext/data/test/data/wordpiece_1k.txt" ), ) ) tokens, segment_labels, seq_len, _ = tensorizer.numberize(row) self.assertEqual(tokens, expected_tokens) self.assertEqual(segment_labels, expected_segment_labels) self.assertEqual(seq_len, len(expected_tokens))
[docs]class RobertaTensorizerTest(unittest.TestCase):
[docs] def test_roberta_tensorizer(self): text = "Prototype" tokens = [[0, 4, 5, 2]] pad_masks = [[1, 1, 1, 1]] segment_labels = [[0, 0, 0, 0]] positions = [[0, 1, 2, 3]] expected = [tokens, pad_masks, segment_labels, positions] tensorizer = RoBERTaTensorizer.from_config( RoBERTaTensorizer.Config( tokenizer=GPT2BPETokenizer.Config( bpe_encoder_path="pytext/data/test/data/gpt2_encoder.json", bpe_vocab_path="pytext/data/test/data/gpt2_vocab.bpe", ), vocab_file="pytext/data/test/data/gpt2_dict.txt", max_seq_len=256, ) ) tensors = tensorizer.tensorize([tensorizer.numberize({"text": text})]) for tensor, expect in zip(tensors, expected): self.assertEqual(tensor.tolist(), expect) tensorizer_impl = RoBERTaTensorizerScriptImpl( tokenizer=DoNothingTokenizer(), vocab=tensorizer.vocab, max_seq_len=tensorizer.max_seq_len, ) script_tensorizer_impl = tensorizer_impl.torchscriptify() per_sentence_tokens = [tensorizer.tokenizer.tokenize(text)] tokens_2d, segment_labels_2d, seq_lens_1d, positions_2d = zip( *[script_tensorizer_impl.numberize(per_sentence_tokens)] ) script_tensors = script_tensorizer_impl.tensorize( tokens_2d, segment_labels_2d, seq_lens_1d, positions_2d ) for tensor, expect in zip(script_tensors, expected): self.assertEqual(tensor.tolist(), expect) # test it is able to call torchscriptify multiple time tensorizer_impl.torchscriptify()
[docs]class SquadForRobertaTensorizerTest(unittest.TestCase):
[docs] def test_squad_roberta_tensorizer(self): row = { "id": 0, "doc": "Prototype", "question": "otype", "answers": ["Prot"], "answer_starts": [0], "has_answer": True, } tensorizer = SquadForRoBERTaTensorizer.from_config( SquadForRoBERTaTensorizer.Config( tokenizer=GPT2BPETokenizer.Config( bpe_encoder_path="pytext/data/test/data/gpt2_encoder.json", bpe_vocab_path="pytext/data/test/data/gpt2_vocab.bpe", ), vocab_file="pytext/data/test/data/gpt2_dict.txt", max_seq_len=250, ) ) tokens, segments, seq_len, positions, start, end = tensorizer.numberize(row) # check against manually verified answer positions in tokenized output # there are 4 identical answers self.assertEqual(start, [3]) self.assertEqual(end, [3]) self.assertEqual(len(tokens), seq_len) self.assertEqual(len(segments), seq_len)
[docs]class SquadForBERTTensorizerTest(unittest.TestCase):
[docs] def test_squad_tensorizer(self): source = SquadDataSource.from_config( SquadDataSource.Config( eval_filename=tests_module.test_file("squad_tiny.json") ) ) row = next(iter(source.eval)) tensorizer = SquadForBERTTensorizer.from_config( SquadForBERTTensorizer.Config( tokenizer=WordPieceTokenizer.Config( wordpiece_vocab_path="pytext/data/test/data/wordpiece_1k.txt" ), max_seq_len=250, ) ) tokens, segments, seq_len, positions, start, end = tensorizer.numberize(row) # check against manually verified answer positions in tokenized output # there are 4 identical answers self.assertEqual(start, [83, 83, 83, 83]) self.assertEqual(end, [87, 87, 87, 87]) self.assertEqual(len(tokens), seq_len) self.assertEqual(len(segments), seq_len) tensorizer.max_seq_len = 50 # answer should be truncated out _, _, _, _, start, end = tensorizer.numberize(row) self.assertEqual(start, [-100, -100, -100, -100]) self.assertEqual(end, [-100, -100, -100, -100]) self.assertEqual(len(tokens), seq_len) self.assertEqual(len(segments), seq_len)
[docs]class SquadTensorizerTest(unittest.TestCase):
[docs] def setUp(self): self.json_data_source = SquadDataSource.from_config( SquadDataSource.Config( train_filename=tests_module.test_file("squad_tiny.json"), eval_filename=None, test_filename=None, ) ) self.tsv_data_source = SquadDataSource.from_config( SquadDataSource.Config( train_filename=tests_module.test_file("squad_tiny.tsv"), eval_filename=None, test_filename=None, ) ) self.tensorizer_with_wordpiece = SquadTensorizer.from_config( SquadTensorizer.Config( tokenizer=WordPieceTokenizer.Config( wordpiece_vocab_path="pytext/data/test/data/wordpiece_1k.txt" ), max_seq_len=250, ) ) self.tensorizer_with_alphanumeric = SquadTensorizer.from_config( SquadTensorizer.Config( tokenizer=Tokenizer.Config(split_regex=r"\W+"), max_seq_len=250 ) )
def _init_tensorizer(self, tsv=False): tensorizer_dict = { "wordpiece": self.tensorizer_with_wordpiece, "alphanumeric": self.tensorizer_with_alphanumeric, } data_source = self.tsv_data_source.train if tsv else self.json_data_source.train initialize_tensorizers(tensorizer_dict, data_source)
[docs] def test_initialize(self): self._init_tensorizer() self.assertEqual(len(self.tensorizer_with_wordpiece.vocab), 1000) self.assertEqual( len(self.tensorizer_with_wordpiece.ques_tensorizer.vocab), 1000 ) self.assertEqual(len(self.tensorizer_with_wordpiece.doc_tensorizer.vocab), 1000) self.assertEqual(len(self.tensorizer_with_alphanumeric.vocab), 1418) self.assertEqual( len(self.tensorizer_with_alphanumeric.ques_tensorizer.vocab), 1418 ) self.assertEqual( len(self.tensorizer_with_alphanumeric.doc_tensorizer.vocab), 1418 )
[docs] def test_numberize_with_alphanumeric(self): self._init_tensorizer() row = next(iter(self.json_data_source.train)) ( doc_tokens, doc_seq_len, ques_tokens, ques_seq_len, answer_start_token_idx, answer_end_token_idx, ) = self.tensorizer_with_alphanumeric.numberize(row) # check against manually verified answer positions in tokenized output # there are 4 identical answers self.assertEqual(len(ques_tokens), ques_seq_len) self.assertEqual(len(doc_tokens), doc_seq_len) self.assertEqual(ques_tokens, [2, 3, 4, 5, 6, 7]) # It's a coincidence. self.assertEqual(answer_start_token_idx, [26, 26, 26, 26]) self.assertEqual(answer_end_token_idx, [26, 26, 26, 26]) self.tensorizer_with_alphanumeric.doc_tensorizer.max_seq_len = 20 # answer should be truncated out because max doc len is smaller. ( doc_tokens, doc_seq_len, ques_tokens, ques_seq_len, answer_start_token_idx, answer_end_token_idx, ) = self.tensorizer_with_alphanumeric.numberize(row) self.assertEqual(len(ques_tokens), ques_seq_len) self.assertEqual(len(doc_tokens), doc_seq_len) self.assertEqual(answer_start_token_idx, [-100]) self.assertEqual(answer_end_token_idx, [-100])
[docs] def test_numberize_with_wordpiece(self): self._init_tensorizer() row = next(iter(self.json_data_source.train)) ( doc_tokens, doc_seq_len, ques_tokens, ques_seq_len, answer_start_token_idx, answer_end_token_idx, ) = self.tensorizer_with_wordpiece.numberize(row) # check against manually verified answer positions in tokenized output # there are 4 identical answers self.assertEqual(len(ques_tokens), ques_seq_len) self.assertEqual(len(doc_tokens), doc_seq_len) self.assertEqual(answer_start_token_idx, [70, 70, 70, 70]) self.assertEqual(answer_end_token_idx, [74, 74, 74, 74]) self.tensorizer_with_wordpiece.doc_tensorizer.max_seq_len = 50 # answer should be truncated out because max doc len is smaller. ( doc_tokens, doc_seq_len, ques_tokens, ques_seq_len, answer_start_token_idx, answer_end_token_idx, ) = self.tensorizer_with_wordpiece.numberize(row) self.assertEqual(len(ques_tokens), ques_seq_len) self.assertEqual(len(doc_tokens), doc_seq_len) self.assertEqual(answer_start_token_idx, [-100]) self.assertEqual(answer_end_token_idx, [-100])
[docs] def test_tsv_numberize_with_alphanumeric(self): # No need to repeat other tests with TSV. # All we want to test is that TSV and JSON loading are identical. self._init_tensorizer(tsv=True) row = next(iter(self.json_data_source.train)) print(row) ( doc_tokens, doc_seq_len, ques_tokens, ques_seq_len, answer_start_token_idx, answer_end_token_idx, ) = self.tensorizer_with_alphanumeric.numberize(row) # check against manually verified answer positions in tokenized output # there are 4 identical answers self.assertEqual(len(ques_tokens), ques_seq_len) self.assertEqual(len(doc_tokens), doc_seq_len) self.assertEqual(ques_tokens, [2, 3, 4, 5, 6, 7]) # It's a coincidence. self.assertEqual(answer_start_token_idx, [26, 26, 26, 26]) self.assertEqual(answer_end_token_idx, [26, 26, 26, 26]) self.tensorizer_with_alphanumeric.doc_tensorizer.max_seq_len = 20 # answer should be truncated out because max doc len is smaller. ( doc_tokens, doc_seq_len, ques_tokens, ques_seq_len, answer_start_token_idx, answer_end_token_idx, ) = self.tensorizer_with_alphanumeric.numberize(row) self.assertEqual(len(ques_tokens), ques_seq_len) self.assertEqual(len(doc_tokens), doc_seq_len) self.assertEqual(answer_start_token_idx, [-100]) self.assertEqual(answer_end_token_idx, [-100])
[docs]class String2DListTensorizerTest(unittest.TestCase): init_rows = [ {"text": [["Move", "fast"], ["And", "break", "things", "fast"]]}, ] test_rows = [ {"text": [["Move", "fast"], ["And", "break", "things", "fast"]]}, {"text": [["Move", "fast"], ["And", "break", "things", "even", "faster"]]}, ] expected_numberized = [ ([[2, 3], [4, 5, 6, 3]], [2, 4], 2), ([[2, 3], [4, 5, 6, 0, 0]], [2, 5], 2), ] expected_tensorized = ( torch.tensor( [ [ [2, 3, 1, 1, 1], [4, 5, 6, 3, 1], ], [ [2, 3, 1, 1, 1], [4, 5, 6, 0, 0], ], ] ), ) def _initialize_tensorizer(self, tensorizer): init = tensorizer.initialize() init.send(None) for row in self.init_rows: init.send(row) init.close() # Test tensorizer in it's original form
[docs] def test_original(self): tensorizer = String2DListTensorizer(column="text") self._initialize_tensorizer(tensorizer) numberized_rows = [tensorizer.numberize(row) for row in self.test_rows] for expected, actual in zip(self.expected_numberized, numberized_rows): self.assertEqual(expected, actual, msg="Numberized rows didn't match!") tensors = tensorizer.tensorize(numberized_rows) for expected, actual in zip(self.expected_tensorized, tensors): print("COMPARE: ", expected, actual) self.assertTrue(expected.equal(actual), msg="Tensors didn't match!")
# Test torchscript version of the tensorizer
[docs] def test_torchscriptified(self): tensorizer = String2DListTensorizer(column="text") self._initialize_tensorizer(tensorizer) ts_tensorizer = tensorizer.torchscriptify() tensors = ts_tensorizer([row["text"] for row in self.test_rows]) for expected, actual in zip(self.expected_tensorized, tensors): self.assertTrue(expected.equal(actual), msg="Tensors didn't match!")
[docs]class CharacterVocabTokenTensorizerTest(unittest.TestCase): def _initialize_tensorizer(self, tensorizer, data): init = tensorizer.initialize() init.send(None) # kick for row in data: init.send(row) init.close()
[docs] def test_character_vocab_token_tensorizer(self): rows = [{"text": "Move fast"}, {"text": "And break things"}] expected_outputs = [ [ ( "char_tokens", [[2, 3, 4, 5], [6, 7, 8, 9]], ), ("char_tokens_lengths", [4, 4]), ], [ ( "char_tokens", [[7, 10, 11], [12, 13, 5, 7, 14], [9, 15, 16, 10, 17, 8]], ), ("char_tokens_lengths", [3, 5, 6]), ], ] expected_tensors = ( [ [ [2, 3, 4, 5, 1, 1], [6, 7, 8, 9, 1, 1], [1, 1, 1, 1, 1, 1], ], [ [7, 10, 11, 1, 1, 1], [12, 13, 5, 7, 14, 1], [9, 15, 16, 10, 17, 8], ], ], [[4, 4, 0], [3, 5, 6]], ) tensorizer = CharacterVocabTokenTensorizer(text_column="text") self._initialize_tensorizer(tensorizer, data=rows) numberized_rows = [tensorizer.numberize(row) for row in rows] for expected, numberized_row in zip(expected_outputs, numberized_rows): for (field, value), out in zip(expected, numberized_row): self.assertEqual(value, out, msg=f"{field} didn't match!") tensor, tensor_lens = tensorizer.tensorize(numberized_rows) self.assertIsInstance(tensor, torch.LongTensor) self.assertIsInstance(tensor_lens, torch.LongTensor) self.assertEqual(tensor.tolist(), expected_tensors[0]) self.assertEqual(tensor_lens.tolist(), expected_tensors[1]) # Check the tensorizer after exporting to TorchScript torchscript_tensorizer = tensorizer.torchscriptify() torchscript_numberized_rows = [ torchscript_tensorizer.numberize( *torchscript_tensorizer.tokenize(row["text"]) ) for row in rows ] for expected, numberized_row in zip( expected_outputs, torchscript_numberized_rows ): for (field, value), out in zip(expected, numberized_row): self.assertEqual( value, out, msg=f"{field} didn't match for torchscriptified tensorizer!", ) torchscript_tensor, torchscript_tensor_lens = torchscript_tensorizer.tensorize( *zip(*numberized_rows) ) self.assertIsInstance(torchscript_tensor, torch.LongTensor) self.assertIsInstance(torchscript_tensor_lens, torch.LongTensor) self.assertEqual(torchscript_tensor.tolist(), expected_tensors[0]) self.assertEqual(torchscript_tensor_lens.tolist(), expected_tensors[1]) # test forward torchscript_tensor, torchscript_tensor_lens = torchscript_tensorizer( ScriptBatchInput([[row["text"]] for row in rows], None, None) ) self.assertIsInstance(torchscript_tensor, torch.LongTensor) self.assertIsInstance(torchscript_tensor_lens, torch.LongTensor) self.assertEqual(torchscript_tensor.tolist(), expected_tensors[0]) self.assertEqual(torchscript_tensor_lens.tolist(), expected_tensors[1])