diff --git a/merlin/dataloader/array.py b/merlin/dataloader/array.py deleted file mode 100644 index 0e4369df..00000000 --- a/merlin/dataloader/array.py +++ /dev/null @@ -1,275 +0,0 @@ -# -# Copyright (c) 2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import contextlib -import itertools - -from merlin.core.compat import cupy as cp -from merlin.core.compat import numpy as np -from merlin.core.dispatch import annotate -from merlin.dataloader.loader_base import LoaderBase - - -class ArrayLoaderBase(LoaderBase): - """Base class containing common functionality between the PyTorch and TensorFlow dataloaders.""" - - def _peek_next_batch(self): - """Return next batch without advancing the iterator.""" - if self._workers is None: - ArrayLoaderBase.__iter__(self) - - # get the first chunks - if self._batch_itr is None: - self._fetch_chunk() - - # try to iterate through existing batches - try: - batch = next(self._batch_itr) - self._batch_itr = itertools.chain([batch], self._batch_itr) - - except StopIteration: - # anticipate any more chunks getting created - # if not, raise the StopIteration - if not self._working and self._buff.empty: - self._workers = None - self._batch_itr = None - raise - - # otherwise get the next chunks and return - # the first batch - self._fetch_chunk() - batch = next(self._batch_itr) - self._batch_itr = itertools.chain([batch], self._batch_itr) - - return batch - - def _get_next_batch(self): - """ - adding this cheap shim so that we can call this - step without it getting overridden by the - framework-specific parent class's `__next__` method. - TODO: can this be better solved with a metaclass - implementation? My gut is that we don't actually - necessarily *want*, in general, to be overriding - __next__ and __iter__ methods - """ - # we've never initialized, do that now - # need this because tf.keras.Model.fit will - # call next() cold - if self._workers is None: - ArrayLoaderBase.__iter__(self) - - # get the first chunks - if self._batch_itr is None: - self._fetch_chunk() - - # try to iterate through existing batches - try: - batch = next(self._batch_itr) - except StopIteration: - # anticipate any more chunks getting created - # if not, raise the StopIteration - if not self._working and self._buff.empty: - self._workers = None - self._batch_itr = None - raise - - # otherwise get the next chunks and return - # the first batch - self._fetch_chunk() - batch = next(self._batch_itr) - # if batch[0] is empty but other exist - for sub in batch: - if sub is not None and len(sub) > 0: - self.num_rows_processed += len(sub) - break - return batch - - def _split_values(self, tensor, values_per_batch, axis=0): - # splits are like offsets but without the first and last entry - splits = list(itertools.accumulate(values_per_batch))[:-1] - return self.array_lib().split(tensor, splits, axis=axis) - - def _subtract_offsets(self, offsets_grouped_by_batch): - subtracted_offsets_grouped_by_batch = [] - for idx, batch_offsets in enumerate(offsets_grouped_by_batch): - if idx != 0: - previous_batch_offsets = offsets_grouped_by_batch[idx - 1] - batch_offsets = batch_offsets - previous_batch_offsets[-1] - subtracted_offsets_grouped_by_batch.append(batch_offsets) - return subtracted_offsets_grouped_by_batch - - @annotate("make_tensors", color="darkgreen", domain="merlin_dataloader") - def make_tensors(self, gdf, use_row_lengths=False): - """Yields batches of tensors from a dataframe - - Parameters - ---------- - gdf : DataFrame - A dataframe type object. - use_row_lengths : bool, optional - Enable using row lengths instead of offsets for list columns, by default False - - Returns - ------- - Dict[Tensors] - A dictionary of the column tensor representations. - - """ - tensors_by_name = self._convert_df_to_tensors(gdf) - rows_per_batch = self._get_rows_per_batch(len(gdf)) - - tensor_batches = {} - - for tensor_key, tensor_value in tensors_by_name.items(): - if isinstance(tensor_value, tuple): - # List feature - full_tensor_values, full_tensor_offsets = tensor_value - - splits = list(itertools.accumulate(rows_per_batch)) - - offsets_grouped_by_batch = [] - if splits: - for idx, split in enumerate([0] + splits[:-1]): - start = split - end = splits[idx] + 1 - offsets_grouped_by_batch.append(full_tensor_offsets[start:end]) - - subtracted_offsets_grouped_by_batch = self._subtract_offsets( - offsets_grouped_by_batch - ) - num_values_per_batch = [ - int(batch_offsets[-1]) for batch_offsets in subtracted_offsets_grouped_by_batch - ] - - batch_values = self._split_values(full_tensor_values, num_values_per_batch) - tensor_batches[tensor_key] = { - "values": batch_values, - "offsets": subtracted_offsets_grouped_by_batch, - } - else: - # Scalar feature - num_values_per_batch = rows_per_batch - tensor_batches[tensor_key] = self._split_values(tensor_value, num_values_per_batch) - - for batch_idx in range(len(rows_per_batch)): - batch = {} - for tensor_key in tensors_by_name: - tensor_value = tensor_batches[tensor_key] - if isinstance(tensor_value, dict): - full_tensor_values = tensor_value["values"][batch_idx] - offsets = tensor_value["offsets"][batch_idx] - batch[tensor_key] = full_tensor_values, offsets - else: - batch[tensor_key] = tensor_value[batch_idx] - - yield self._process_batch(batch) - - -class ArrayLoader(ArrayLoaderBase): - """ - NumPy/CuPy Array dataloader - - Parameters - ------------- - dataset: merlin.io.Dataset - The dataset to load - batch_size: int - Number of rows to yield at each iteration - shuffle: bool, default True - Whether to shuffle chunks of batches before iterating through them. - seed_fn: callable - Function used to initialize random state - parts_per_chunk: int - Number of dataset partitions with size dictated by `buffer_size` - to load and concatenate asynchronously. More partitions leads to - better epoch-level randomness but can negatively impact throughput - global_size: int, optional - When doing distributed training, this indicates the number of total processes that are - training the model. - global_rank: - When doing distributed training, this indicates the local rank for the current process. - drop_last: bool, default False - Whether or not to drop the last batch in an epoch. This is useful when you need to - guarantee that each batch contains exactly `batch_size` rows - since the last batch - will usually contain fewer rows. - """ - - def array_lib(self): - if self.device != "cpu": - return cp - else: - return np - - def __len__(self): - """Number of batches in the dataloader.""" - return ArrayLoaderBase.__len__(self) - - def __getitem__(self, index): - """Gets batch at position `index`. - - Note: This returns the next batch in the iterator. - Not the batch at position `index`. - This is because the dataloader is implemented as an iterator and - don't currently support fetching a batch by index. - """ - return ArrayLoaderBase.__next__(self) - - @contextlib.contextmanager - def _get_device_ctx(self, dev): - yield dev - - def _split_fn(self, tensor, idx, axis=0): - splits = list(itertools.accumulate(idx))[:-1] - return self.array_lib().split(tensor, splits, axis=axis) - - def _tensor_split(self, tensor, idx, axis=0): - return self.array_lib().split(tensor, idx, axis=axis) - - def _to_tensor(self, df_or_series): - if df_or_series.empty: - return - - # if you have one series in a dataframe pull that series out - # otherwise you will add a dimension to the column [[1,2,3]] - try: - if len(df_or_series.columns) == 1: - df_or_series = df_or_series.iloc[:, 0] - except AttributeError: - pass - - if self.device == "cpu": - tensor = df_or_series.to_numpy() - else: - tensor = df_or_series.to_cupy() - - return tensor - - def _cast_to_numpy_dtype(self, dtype): - return dtype - - def _to_sparse_tensor(self, values_offset, column_name): - raise NotImplementedError("Sparse support isn't implemented yet for the array dataloader") - - def _reshape_dim(self, tensor): - return self.array_lib().reshape(tensor, [-1]) - - def _row_lengths_to_offsets(self, row_lengths): - zero_value = self.array_lib().array([0], dtype=row_lengths.dtype) - if len(row_lengths.shape) == 2: - zero_value = self.array_lib().expand_dims(zero_value, axis=0) - return self.array_lib().concatenate( - [zero_value, self.array_lib().cumsum(row_lengths)], axis=0 - ) diff --git a/merlin/dataloader/jax.py b/merlin/dataloader/jax.py index 81fea556..195bdc6c 100644 --- a/merlin/dataloader/jax.py +++ b/merlin/dataloader/jax.py @@ -15,8 +15,6 @@ # import contextlib -import jax -import jax.dlpack import jax.numpy as jnp import numpy as np @@ -72,33 +70,6 @@ def _split_fn(self, tensor, idx, axis=0): def _sum(self, tensor): return tensor.sum() - def _to_tensor(self, df_or_series): - if df_or_series.empty: - return - - transpose = False - - # checks necessary because of this bug - # https://github.com/tensorflow/tensorflow/issues/42660 - # same logic as in TF dataloader - if len(df_or_series.shape) == 1 or df_or_series.shape[1] == 1: - dlpack = self._pack(df_or_series) - else: - transpose = True - dlpack = self._pack(df_or_series.values.T) - - x = self._unpack(dlpack) - - if transpose: - x = x.T - - return x - - def _unpack(self, gdf): - if hasattr(gdf, "shape"): - return jax.device_put(gdf) - return jax.dlpack.from_dlpack(gdf) - def _cast_to_numpy_dtype(self, dtype): # jax uses numpy dtypes, so this is kinda easy return dtype diff --git a/merlin/dataloader/loader_base.py b/merlin/dataloader/loader_base.py index e8b507d0..b56770bf 100644 --- a/merlin/dataloader/loader_base.py +++ b/merlin/dataloader/loader_base.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import contextlib import copy import itertools import math @@ -175,6 +176,16 @@ def _set_epochs(self, epochs): self.__buff_len = None self._epochs = epochs + def __getitem__(self, index): + """Gets batch at position `index`. + + Note: This returns the next batch in the iterator. + Not the batch at position `index`. + This is because the dataloader is implemented as an iterator and + don't currently support fetching a batch by index. + """ + return self.__next__() + def __len__(self): batches = _num_steps(self._buff_len, self.batch_size) if self.drop_last and self._buff_len % self.batch_size > 0: @@ -353,38 +364,50 @@ def make_tensors(self, gdf, use_row_lengths=False): A dictionary of the column tensor representations. """ - split_idx = self._get_rows_per_batch(len(gdf)) - - # convert dataframe to framework-specific tensors tensors_by_name = self._convert_df_to_tensors(gdf) + rows_per_batch = self._get_rows_per_batch(len(gdf)) - # split them into batches and map to the framework-specific output format tensor_batches = {} for tensor_key, tensor_value in tensors_by_name.items(): if isinstance(tensor_value, tuple): - values, offsets = tensor_value - row_lengths = offsets[1:] - offsets[:-1] - batch_row_lengths = self._split_fn(row_lengths, split_idx) - values_split_idx = [sum(row_lengths.tolist()) for row_lengths in batch_row_lengths] - batch_values = self._split_fn(values, values_split_idx) + # List feature + full_tensor_values, full_tensor_offsets = tensor_value + + splits = list(itertools.accumulate(rows_per_batch)) + + offsets_grouped_by_batch = [] + if splits: + for idx, split in enumerate([0] + splits[:-1]): + start = split + end = splits[idx] + 1 + offsets_grouped_by_batch.append(full_tensor_offsets[start:end]) + + subtracted_offsets_grouped_by_batch = self._subtract_offsets( + offsets_grouped_by_batch + ) + num_values_per_batch = [ + int(batch_offsets[-1]) for batch_offsets in subtracted_offsets_grouped_by_batch + ] + + batch_values = self._split_values(full_tensor_values, num_values_per_batch) tensor_batches[tensor_key] = { "values": batch_values, - "row_lengths": batch_row_lengths, + "offsets": subtracted_offsets_grouped_by_batch, } else: - tensor_batches[tensor_key] = self._split_fn(tensor_value, split_idx) + # Scalar feature + num_values_per_batch = rows_per_batch + tensor_batches[tensor_key] = self._split_values(tensor_value, num_values_per_batch) - for batch_idx in range(len(split_idx)): + for batch_idx in range(len(rows_per_batch)): batch = {} for tensor_key in tensors_by_name: tensor_value = tensor_batches[tensor_key] if isinstance(tensor_value, dict): - values = tensor_value["values"][batch_idx] - row_partition = tensor_value["row_lengths"][batch_idx] - if not use_row_lengths: - row_partition = self._row_lengths_to_offsets(row_partition) - batch[tensor_key] = values, row_partition + full_tensor_values = tensor_value["values"][batch_idx] + offsets = tensor_value["offsets"][batch_idx] + batch[tensor_key] = full_tensor_values, offsets else: batch[tensor_key] = tensor_value[batch_idx] @@ -408,24 +431,52 @@ def _to_tensor(self, df_or_series): tensor in the appropriate library, with an optional dtype kwarg to do explicit casting if need be """ - raise NotImplementedError + if df_or_series.empty: + return + + # if you have one series in a dataframe pull that series out + # otherwise you will add a dimension to the column [[1,2,3]] + try: + if len(df_or_series.columns) == 1: + df_or_series = df_or_series.iloc[:, 0] + except AttributeError: + pass + + if self.device == "cpu": + tensor = df_or_series.to_numpy() + else: + tensor = df_or_series.to_cupy() + return tensor + + @contextlib.contextmanager def _get_device_ctx(self, dev): """ One of the mandatory functions a child class needs to implement. Maps from a GPU index to a framework context object for placing tensors on specific GPUs """ - raise NotImplementedError + yield dev def _cast_to_numpy_dtype(self, dtype): """ Get the numpy dtype from the framework dtype. """ - raise NotImplementedError - - def _split_fn(self, tensor, idx, axis=0): - raise NotImplementedError + raise dtype + + def _split_values(self, tensor, values_per_batch, axis=0): + # splits are like offsets but without the first and last entry + splits = list(itertools.accumulate(values_per_batch))[:-1] + return self.array_lib().split(tensor, splits, axis=axis) + + def _subtract_offsets(self, offsets_grouped_by_batch): + subtracted_offsets_grouped_by_batch = [] + for idx, batch_offsets in enumerate(offsets_grouped_by_batch): + if idx != 0: + previous_batch_offsets = offsets_grouped_by_batch[idx - 1] + batch_offsets = batch_offsets - previous_batch_offsets[-1] + subtracted_offsets_grouped_by_batch.append(batch_offsets) + return subtracted_offsets_grouped_by_batch def _separate_list_columns(self, gdf): lists, scalars = [], [] @@ -436,6 +487,16 @@ def _separate_list_columns(self, gdf): scalars.append(col) return scalars, lists + def array_lib(self): + return self._array_lib + + def _split_fn(self, tensor, idx, axis=0): + splits = list(itertools.accumulate(idx))[:-1] + return self.array_lib().split(tensor, splits, axis=axis) + + def _tensor_split(self, tensor, idx, axis=0): + return self.array_lib().split(tensor, idx, axis=axis) + @annotate("_convert_df_to_tensors", color="darkgreen", domain="merlin_dataloader") def _convert_df_to_tensors(self, gdf): """Convert a dataframe into framework tensors. @@ -516,26 +577,6 @@ def _process_batch(self, tensors): return X, labels - def _pack(self, gdf): - if isinstance(gdf, np.ndarray): - return gdf - # if self.device has value ('cpu') gdf should not be transferred to dlpack - elif ( - hasattr(gdf, "to_dlpack") - and callable(getattr(gdf, "to_dlpack")) - and self.device != "cpu" - ): - return gdf.to_dlpack() - elif hasattr(gdf, "to_numpy") and callable(getattr(gdf, "to_numpy")): - if hasattr(gdf, "columns") and len(gdf.columns) == 1: - values = gdf[gdf.columns[0]].to_numpy() - else: - values = gdf.to_numpy() - if isinstance(values[0], list): - values = np.stack(values) - return values - return gdf.toDlpack() - @property def schema(self) -> Schema: """Get input schema of data to be loaded diff --git a/merlin/dataloader/tensorflow.py b/merlin/dataloader/tensorflow.py index 4e1a6e32..756c3ce0 100644 --- a/merlin/dataloader/tensorflow.py +++ b/merlin/dataloader/tensorflow.py @@ -16,12 +16,12 @@ from functools import partial from merlin.core.compat import tensorflow as tf -from merlin.dataloader.array import ArrayLoader +from merlin.dataloader.loader_base import LoaderBase from merlin.table import TensorColumn, TensorflowColumn, TensorTable from merlin.table.conversions import _dispatch_dlpack_fns, convert_col -class Loader(ArrayLoader, tf.keras.utils.Sequence): +class Loader(LoaderBase, tf.keras.utils.Sequence): def __init__( self, dataset, @@ -64,8 +64,8 @@ def __len__(self): from keras prior to the start of the main loop through the loader. """ - ArrayLoader.stop(self) - return ArrayLoader.__len__(self) + LoaderBase.stop(self) + return LoaderBase.__len__(self) def __getitem__(self, index): """Gets batch at position `index`. diff --git a/merlin/dataloader/torch.py b/merlin/dataloader/torch.py index d6d7c703..4297f558 100644 --- a/merlin/dataloader/torch.py +++ b/merlin/dataloader/torch.py @@ -16,12 +16,12 @@ from functools import partial from merlin.core.compat import torch as th -from merlin.dataloader.array import ArrayLoader +from merlin.dataloader.loader_base import LoaderBase from merlin.table import TensorColumn, TensorTable, TorchColumn from merlin.table.conversions import _dispatch_dlpack_fns, convert_col -class Loader(ArrayLoader, th.utils.data.IterableDataset): +class Loader(LoaderBase, th.utils.data.IterableDataset): def __init__( self, dataset, diff --git a/tests/unit/dataloader/test_array_to_tensorflow.py b/tests/unit/dataloader/test_array_to_tensorflow.py index 1d286251..d17582c7 100644 --- a/tests/unit/dataloader/test_array_to_tensorflow.py +++ b/tests/unit/dataloader/test_array_to_tensorflow.py @@ -22,8 +22,8 @@ from merlin.schema import Tags pytest.importorskip("tensorflow") -tf_dataloader = pytest.importorskip("merlin.dataloader.frameworks.tensorflow") -loader = tf_dataloader.TFArrayDataloader +tf_dataloader = pytest.importorskip("merlin.dataloader.tensorflow") +loader = tf_dataloader.Loader def test_array_dataloader_with_tensorflow(): diff --git a/tests/unit/dataloader/test_array_to_torch.py b/tests/unit/dataloader/test_array_to_torch.py index 7806fbb0..3207eff0 100644 --- a/tests/unit/dataloader/test_array_to_torch.py +++ b/tests/unit/dataloader/test_array_to_torch.py @@ -21,8 +21,8 @@ from merlin.schema import Tags pytest.importorskip("torch") -torch_loader = pytest.importorskip("merlin.dataloader.frameworks.torch") -loader = torch_loader.TorchArrayDataloader +torch_loader = pytest.importorskip("merlin.dataloader.torch") +loader = torch_loader.Loader def test_array_dataloader_with_torch(): diff --git a/tests/unit/dataloader/test_embeddings.py b/tests/unit/dataloader/test_embeddings.py index 67d96f67..fe4b5249 100644 --- a/tests/unit/dataloader/test_embeddings.py +++ b/tests/unit/dataloader/test_embeddings.py @@ -19,7 +19,7 @@ import pytest from merlin.core.dispatch import HAS_GPU -from merlin.dataloader.array import ArrayLoader as Loader # noqa +from merlin.dataloader.loader_base import LoaderBase as Loader # noqa from merlin.dataloader.ops.embeddings import ( # noqa EmbeddingOperator, MmapNumpyEmbedding,