#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import functools
import itertools
import math
import random
from typing import Any, Dict, Iterable, List, MutableMapping, NamedTuple, Optional, Type
from pytext.common.constants import RawExampleFieldName, Stage
from pytext.config.component import Component, ComponentType, Registry, create_component
from pytext.utils.usage import log_class_usage
from .sources import DataSource, RawExample, TSVDataSource
from .sources.data_source import (
GeneratorIterator,
RowShardedDataSource,
ShardedDataSource,
)
from .tensorizers import MetricTensorizer, Tensorizer, initialize_tensorizers
[docs]class RowData(NamedTuple):
raw_data: RawExample
numberized: RawExample
[docs]class BatchData(NamedTuple):
raw_data: List[RawExample]
numberized: Dict[str, List[Any]]
[docs]class Batcher(Component):
"""Batcher designed to batch rows of data, before padding."""
__COMPONENT_TYPE__ = ComponentType.BATCHER
__EXPANSIBLE__ = True
[docs] class Config(Component.Config):
#: Make batches of this size when possible. If there's not enough data,
#: might generate some smaller batches.
train_batch_size: int = 16
eval_batch_size: int = 16
test_batch_size: int = 16
[docs] @classmethod
def from_config(cls, config: Config):
return cls(
config.train_batch_size, config.eval_batch_size, config.test_batch_size
)
def __init__(
self,
train_batch_size=Config.train_batch_size,
eval_batch_size=Config.eval_batch_size,
test_batch_size=Config.test_batch_size,
):
self.train_batch_size = train_batch_size
self.eval_batch_size = eval_batch_size
self.test_batch_size = test_batch_size
self._batch_sizes = {
Stage.TRAIN: self.train_batch_size,
Stage.TEST: self.test_batch_size,
Stage.EVAL: self.eval_batch_size,
}
[docs] def batchify(
self, iterable: Iterable[RawExample], sort_key=None, stage=Stage.TRAIN
):
"""Group rows by batch_size. Assume iterable of dicts, yield dict of lists.
The last batch will be of length len(iterable) % batch_size."""
batch_size = self._batch_sizes[stage]
for batch in self._group_iter(iterable, batch_size, sort_key):
raw_batch, numberized_batch = zip(*batch)
yield BatchData(raw_batch, zip_dicts(numberized_batch))
def _group_iter(self, iterable: Iterable[RawExample], group_size, sort_key=None):
iterators = [iter(iterable)] * group_size
for group in itertools.zip_longest(*iterators):
group = [ex for ex in group if ex is not None]
if sort_key:
group.sort(key=sort_key, reverse=True)
yield group
[docs]class PoolingBatcher(Batcher):
"""
Batcher that shuffles and (if requested) sorts data.
**Rationale**
There is a trade-off between having batches of data that are truly randomly
shuffled, and batches of data that are efficiently padded. If we wanted
to maximise the efficiency of padding (i.e. minimise the amount of padding
that is needed), we would have to enforce that all inputs of a similar
length appear in the same batch. This however would lead to a dramatic
decrease in the randomness of batches. On the other end of the spectrum,
if we wanted to maximise randomness, we would often end up with inputs of
wildly different lengths in the same batch, which would lead to a lot of
padding.
**Operation**
This batcher uses a multi-staged approach.
1. It first loads a number of "pools" of data, and shuffles them (this is
controlled by `num_shuffled_pools`).
2. It then splits up the shuffled data sequentially into individual pools,
and the examples within each pool are sorted (if requested).
3. Finally, each pool is split up sequentially into batches, and yielded.
If sorting was requested in step #2, the order in which the batches are
yielded is randomised.
The size of a pool is expressed as a multiple of the batch size, and is
controlled by `pool_num_batches`.
**Examples**
Assuming sorting is enabled, with the default settings of
`pool_num_batches: 1000` and `num_shuffled_pools: 1`, a pool of
`1k * batch_size` examples is loaded, sorted by length, and split up into
1k batches. These batches are then yielded in random order. Once they run
out, a new pool is loaded, and the process is repeated. An advantage of
this approach is that padding will be somewhat reduced. A disadvantage is
that, for every epoch, the first 1k batches will be always the same (albeit
in a different order).
On the other hand, specifying `pool_num_batches: 1000` and
`num_shuffled_pools: 1000` would achieve the following:
`1k * 1k * batch_size` examples are loaded, and shuffled. These are then
split up into pools of size `1k * batch_size`, which are then sorted
internally, split into individual batches, and yielded in random order.
Compared to the previous example, we no longer have the problem that the
first 1k batches are always the same in each epoch, but we've had to load
in memory 1M examples.
"""
[docs] class Config(Batcher.Config):
#: Size of a pool expressed in number of batches
pool_num_batches: int = 1000
#: How many pool-sized chunks to load at a time for shuffling
num_shuffled_pools: int = 1
[docs] @classmethod
def from_config(cls, config: Config):
return cls(
config.train_batch_size,
config.eval_batch_size,
config.test_batch_size,
config.pool_num_batches,
config.num_shuffled_pools,
)
def __init__(
self,
train_batch_size=Config.train_batch_size,
eval_batch_size=Config.eval_batch_size,
test_batch_size=Config.test_batch_size,
pool_num_batches=Config.pool_num_batches,
num_shuffled_pools=Config.num_shuffled_pools,
):
super().__init__(train_batch_size, eval_batch_size, test_batch_size)
assert pool_num_batches >= 1 and num_shuffled_pools >= 1
self.pool_num_batches = pool_num_batches
self.num_shuffled_pools = num_shuffled_pools
[docs] def get_batch_size(self, stage: Stage) -> int:
return self._batch_sizes[stage]
[docs] def batchify(
self, iterable: Iterable[RawExample], sort_key=None, stage=Stage.TRAIN
):
"""
From an iterable of dicts, yield dicts of lists:
1. Load `num_shuffled_pools` pools of data, and shuffle them.
2. Load a pool (`batch_size * pool_num_batches` examples).
3. Sort rows, if necessary.
4. Shuffle the order in which the batches are returned, if necessary.
"""
batch_size = self.get_batch_size(stage)
pool_size = batch_size * self.pool_num_batches
super_pool_size = pool_size * self.num_shuffled_pools
for super_pool in self._group_iter(iterable, super_pool_size, None):
# No point in shuffling if we're loading a single pool which is then sorted.
if self.num_shuffled_pools > 1 or sort_key is None:
random.shuffle(super_pool)
for pool in self._group_iter(super_pool, pool_size, sort_key):
batch_indices = list(range(math.ceil(len(pool) / batch_size)))
if sort_key:
random.shuffle(batch_indices)
for batch_index in batch_indices:
batch = pool[
batch_size * batch_index : batch_size * (batch_index + 1)
]
raw_batch, numberized_batch = zip(*batch)
yield BatchData(raw_batch, zip_dicts(numberized_batch))
[docs]def pad_and_tensorize_batches(tensorizers, batches):
for raw_batch, numberized_batch in batches:
tensor_dict = {}
for name, tensorizer in tensorizers.items():
if isinstance(tensorizer, MetricTensorizer):
tensor_dict[name] = tensorizer.tensorize(numberized_batch)
else:
tensor_dict[name] = tensorizer.tensorize(numberized_batch[name])
yield raw_batch, tensor_dict
[docs]def zip_dicts(dicts):
all_keys = set(itertools.chain.from_iterable(dicts))
zipped = {key: [] for key in all_keys}
for d in dicts:
for key in all_keys:
zipped[key].append(d.get(key))
return zipped
[docs]def generator_iterator(fn):
"""Turn a generator into a GeneratorIterator-wrapped function.
Effectively this allows iterating over a generator multiple times by recording
the call arguments, and calling the generator with them anew each item __iter__
is called on the returned object."""
@functools.wraps(fn)
def wrapped(*args, **kwargs):
return GeneratorIterator(fn, *args, **kwargs)
return wrapped
[docs]class Data(Component):
"""Data is an abstraction that handles all of the following:
- Initialize model metadata parameters
- Create batches of tensors for model training or prediction
It can accomplish these in any way it needs to. The base implementation
utilizes `pytext.data.sources.DataSource`, and sends batches to
`pytext.data.tensorizers.Tensorizer` to create tensors.
The `tensorizers` dict passed to the initializer should be considered something like
a signature for the model. Each batch should be a dictionary with the same keys
as the `tensorizers` dict, and values should be tensors arranged in the way
specified by that tensorizer. The tensorizers dict doubles as a simple baseline
implementation of that same signature, but subclasses of Data can override the
implementation using other methods. This value is how the model specifies what
inputs it's looking for.
"""
__COMPONENT_TYPE__ = ComponentType.DATA_HANDLER
__EXPANSIBLE__ = True
[docs] class Config(Component.Config):
#: Specify where training/test/eval data come from. The default value
#: will not provide any data.
source: DataSource.Config = TSVDataSource.Config()
#: How training examples are split into batches for the optimizer.
batcher: Batcher.Config = PoolingBatcher.Config()
sort_key: Optional[str] = None
#: cache numberized result in memory, turn off when CPU memory bound.
in_memory: Optional[bool] = True
[docs] @classmethod
def from_config(
cls,
config: Config,
schema: Dict[str, Type],
tensorizers: Dict[str, Tensorizer],
rank=0,
world_size=1,
init_tensorizers=True,
**kwargs,
):
data_source_cls = Registry.get(ComponentType.DATA_SOURCE, type(config.source))
if issubclass(data_source_cls, ShardedDataSource):
# data source is already sharded, we don't need to wrap RowShardedDataSource
data_source = create_component(
ComponentType.DATA_SOURCE,
config.source,
schema,
rank=rank,
world_size=world_size,
)
else:
unsharded_data_source = create_component(
ComponentType.DATA_SOURCE, config.source, schema
)
data_source = RowShardedDataSource(
data_source=unsharded_data_source, rank=rank, world_size=world_size
)
batcher = create_component(ComponentType.BATCHER, config.batcher)
return cls(
data_source,
tensorizers,
batcher=batcher,
sort_key=config.sort_key,
in_memory=config.in_memory,
init_tensorizers=init_tensorizers,
**kwargs,
)
def __init__(
self,
data_source: DataSource,
tensorizers: Dict[str, Tensorizer],
batcher: Batcher = None,
sort_key: Optional[str] = None,
in_memory: Optional[bool] = True,
init_tensorizers: Optional[bool] = True,
init_tensorizers_from_scratch: Optional[bool] = True,
):
"""This function should also initialize the passed in tensorizers with
metadata they need for model construction."""
self.data_source = data_source
self.tensorizers = tensorizers
self.batcher = batcher or Batcher()
self.sort_key = sort_key
self.in_memory = in_memory
self.numberized_cache: MutableMapping[str, Any] = {}
self.cache_mutex: Dict[str, bool] = {}
full_train_data = (
data_source.train_unsharded
if isinstance(data_source, ShardedDataSource)
else data_source.train
)
if init_tensorizers:
initialize_tensorizers(
self.tensorizers, full_train_data, init_tensorizers_from_scratch
)
else:
print(
"Skipped initializing tensorizers since they are loaded from a "
"previously saved state."
)
log_class_usage(__class__)
[docs] def numberize_rows(self, rows):
for row in rows:
numberized = {
name: tensorizer.numberize(row)
for name, tensorizer in self.tensorizers.items()
}
yield RowData(row, numberized)
[docs] def cache(self, numberized_rows, stage):
if stage in self.cache_mutex:
# already have generator caching the numberized data
for numberized_row in numberized_rows:
yield numberized_row
else:
self.cache_mutex[stage] = True
result = []
for numberized_row in numberized_rows:
result.append(numberized_row)
yield numberized_row
self.numberized_cache[stage] = result
[docs] def add_row_indices(self, rows):
for idx, row in enumerate(rows):
row[RawExampleFieldName.ROW_INDEX] = idx
yield row
[docs] @generator_iterator
def batches(self, stage: Stage, data_source=None, load_early=False):
"""Create batches of tensors to pass to model train_batch.
This function yields dictionaries that mirror the `tensorizers` dict passed to
`__init__`, ie. the keys will be the same, and the tensors will be the shape
expected from the respective tensorizers.
`stage` is used to determine which data source is used to create batches.
if data_source is provided, it is used instead of the configured data_sorce
this is to allow setting a different data_source for testing a model.
Passing in `load_early` = True disables loading all data in memory and using
PoolingBatcher, so that we get the first batch as quickly as possible.
"""
data_source = data_source if data_source is not None else self.data_source
rows = {
Stage.TRAIN: data_source.train,
Stage.TEST: data_source.test,
Stage.EVAL: data_source.eval,
}[stage]
# We add row indices here so that the original order can be reproduced
# after shuffling the data if necessary.
indexed_rows = self.add_row_indices(rows)
# rows and numberized_rows are generators which can iterate over large
# datasets; be careful not to do any operations which will expend them.
if self.in_memory and not load_early:
numberized_rows = self.numberized_cache.get(stage, None)
if numberized_rows is None:
numberized_rows = self.cache(self.numberize_rows(indexed_rows), stage)
else:
print(f"Get numberized rows from cache in stage: {stage}")
else:
numberized_rows = self.numberize_rows(indexed_rows)
sort_key = self.sort_key
def key(row):
return self.tensorizers[sort_key].sort_key(row.numberized[sort_key])
if load_early:
batcher = Batcher(
self.batcher.train_batch_size,
self.batcher.eval_batch_size,
self.batcher.test_batch_size,
)
else:
batcher = self.batcher
batches = batcher.batchify(
numberized_rows, sort_key=(key if sort_key else None), stage=stage
)
return pad_and_tensorize_batches(self.tensorizers, batches)