From 02aad2124e247e6a4f229d6638eaaec0931aca8c Mon Sep 17 00:00:00 2001 From: Karl Higley Date: Mon, 13 Feb 2023 15:23:40 -0500 Subject: [PATCH] Replace `nnzs` with `row_lengths` for clarity (#99) * Replace `nnzs` with `row_lengths` for clarity The term `nnzs` (i.e. number of non-zeros) comes from sparse tensor nomenclature, but sparse tensors are an implementation detail of the dataloaders, not a domain concept we should be propagating throughout our code base. Instead, let's call them `row_lengths` which make sense as part of a ragged tensor representation throughout Merlin. * Autoupdate precommit package versions * Apply auto-formatting to appease the linter --- .pre-commit-config.yaml | 14 +++--- merlin/dataloader/loader_base.py | 50 ++++++++++----------- merlin/dataloader/tensorflow.py | 2 +- merlin/dataloader/utils/tf/tf_trainer.py | 2 +- tests/unit/dataloader/test_tf_dataloader.py | 13 +++--- 5 files changed, 40 insertions(+), 41 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 07d9ed42..665c637e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,30 +5,30 @@ repos: hooks: - id: absolufy-imports - repo: https://github.com/timothycrosley/isort - rev: 5.10.1 + rev: 5.12.0 hooks: - id: isort additional_dependencies: [toml] exclude: examples/.* # code style - repo: https://github.com/python/black - rev: 22.6.0 + rev: 23.1.0 hooks: - id: black - repo: https://github.com/pycqa/pylint - rev: v2.14.1 + rev: v2.16.1 hooks: - id: pylint - repo: https://github.com/pycqa/flake8 - rev: 3.9.2 + rev: 6.0.0 hooks: - id: flake8 - repo: https://github.com/adrienverge/yamllint - rev: v1.28.0 + rev: v1.29.0 hooks: - id: yamllint - repo: https://github.com/pre-commit/mirrors-prettier - rev: v2.7.1 + rev: v3.0.0-alpha.4 hooks: - id: prettier types_or: [yaml, markdown] @@ -46,7 +46,7 @@ repos: exclude: ^(docs|examples|tests|setup.py|versioneer.py) args: [--config=pyproject.toml] - repo: https://github.com/codespell-project/codespell - rev: v2.2.1 + rev: v2.2.2 hooks: - id: codespell exclude: .github/.* diff --git a/merlin/dataloader/loader_base.py b/merlin/dataloader/loader_base.py index 556bce4b..fd0c0b7e 100644 --- a/merlin/dataloader/loader_base.py +++ b/merlin/dataloader/loader_base.py @@ -51,7 +51,7 @@ def _num_steps(num_samples, step_size): class LoaderBase: """Base class containing common functionality between the PyTorch and TensorFlow dataloaders.""" - _use_nnz = False + _use_row_lengths = False def __init__( self, @@ -337,15 +337,15 @@ def _get_next_batch(self): return batch @annotate("make_tensors", color="darkgreen", domain="merlin_dataloader") - def make_tensors(self, gdf, use_nnz=False): + def make_tensors(self, gdf, use_row_lengths=False): """Turns a gdf into tensor representation by column Parameters ---------- gdf : DataFrame A dataframe type object. - use_nnz : bool, optional - toggle nnzs or use offsets for list columns, by default False + use_row_lengths : bool, optional + Enable using row lengths instead of offsets for list columns, by default False Returns ------- @@ -357,7 +357,7 @@ def make_tensors(self, gdf, use_nnz=False): # map from big chunk to framework-specific tensors chunks, names = self._create_tensors(gdf) - # if we have any offsets, calculate nnzs up front + # if we have any offsets, calculate row lengths up front # will need to get offsets if list columns detected in schema # if len(chunks) == 4: @@ -368,8 +368,8 @@ def make_tensors(self, gdf, use_nnz=False): ] if len(lists_list) > 0: offsets = chunks[-1] - if use_nnz: - nnzs = offsets[1:] - offsets[:-1] + if use_row_lengths: + row_lengths = offsets[1:] - offsets[:-1] chunks = chunks[:-1] # split them into batches and map to the framework-specific output format @@ -388,43 +388,43 @@ def make_tensors(self, gdf, use_nnz=False): if lists is not None: num_list_columns = len(lists) - # grab the set of offsets and nnzs corresponding to - # the list columns from this chunk + # grab the set of offsets and row lengths + # corresponding to the list columns from this chunk chunk_offsets = offsets[:, offset_idx : offset_idx + num_list_columns] - if use_nnz: - chunk_nnzs = nnzs[:, offset_idx : offset_idx + num_list_columns] + if use_row_lengths: + chunk_row_lengths = row_lengths[:, offset_idx : offset_idx + num_list_columns] offset_idx += num_list_columns # split them into batches, including an extra 1 on the offsets # so we know how long the very last element is batch_offsets = self._split_fn(chunk_offsets, split_idx + [1]) - if use_nnz and len(split_idx) > 1: - batch_nnzs = self._split_fn(chunk_nnzs, split_idx) - elif use_nnz: - batch_nnzs = [chunk_nnzs] + if use_row_lengths and len(split_idx) > 1: + batch_row_lengths = self._split_fn(chunk_row_lengths, split_idx) + elif use_row_lengths: + batch_row_lengths = [chunk_row_lengths] else: - batch_nnzs = [None] * (len(batch_offsets) - 1) + batch_row_lengths = [None] * (len(batch_offsets) - 1) # group all these indices together and iterate through # them in batches to grab the proper elements from each # values tensor - chunk = zip(chunk, batch_offsets[:-1], batch_offsets[1:], batch_nnzs) + chunk = zip(chunk, batch_offsets[:-1], batch_offsets[1:], batch_row_lengths) for n, c in enumerate(chunk): if isinstance(c, tuple): - c, off0s, off1s, _nnzs = c + c, off0s, off1s, _row_lengths = c offsets_split_idx = [1 for _ in range(num_list_columns)] off0s = self._split_fn(off0s, offsets_split_idx, axis=1) off1s = self._split_fn(off1s, offsets_split_idx, axis=1) - if use_nnz: - _nnzs = self._split_fn(_nnzs, offsets_split_idx, axis=1) + if use_row_lengths: + _row_lengths = self._split_fn(_row_lengths, offsets_split_idx, axis=1) # TODO: does this need to be ordereddict? batch_lists = {} for k, (column_name, values) in enumerate(lists.items()): off0, off1 = off0s[k], off1s[k] - if use_nnz: - nnz = _nnzs[k] + if use_row_lengths: + row_length = _row_lengths[k] # need to grab scalars for TF case if len(off0.shape) == 1: @@ -435,7 +435,7 @@ def make_tensors(self, gdf, use_nnz=False): print(off0, off1) raise ValueError value = values[int(start) : int(stop)] - index = off0 - start if not use_nnz else nnz + index = off0 - start if not use_row_lengths else row_length batch_lists[column_name] = (value, index) c = (c, batch_lists) @@ -829,7 +829,7 @@ def chunk_logic(self, itr): chunks = shuffle_df(chunks) if len(chunks) > 0: - chunks = self.dataloader.make_tensors(chunks, self.dataloader._use_nnz) + chunks = self.dataloader.make_tensors(chunks, self.dataloader._use_row_lengths) # put returns True if buffer is stopped before # packet can be put in queue. Keeps us from # freezing on a put on a full queue @@ -838,7 +838,7 @@ def chunk_logic(self, itr): chunks = None # takes care final batch, which is less than batch size if not self.dataloader.drop_last and spill is not None and not spill.empty: - spill = self.dataloader.make_tensors(spill, self.dataloader._use_nnz) + spill = self.dataloader.make_tensors(spill, self.dataloader._use_row_lengths) self.put(spill) @annotate("load_chunks", color="darkgreen", domain="merlin_dataloader") diff --git a/merlin/dataloader/tensorflow.py b/merlin/dataloader/tensorflow.py index 8ea71399..c4aab172 100644 --- a/merlin/dataloader/tensorflow.py +++ b/merlin/dataloader/tensorflow.py @@ -102,7 +102,7 @@ class Loader(tf.keras.utils.Sequence, LoaderBase): will usually contain fewer rows. """ - _use_nnz = True + _use_row_lengths = True def __init__( self, diff --git a/merlin/dataloader/utils/tf/tf_trainer.py b/merlin/dataloader/utils/tf/tf_trainer.py index e900bd37..fa3b6840 100644 --- a/merlin/dataloader/utils/tf/tf_trainer.py +++ b/merlin/dataloader/utils/tf/tf_trainer.py @@ -90,7 +90,7 @@ def seed_fn(): for col in CATEGORICAL_MH_COLUMNS: inputs[col] = ( tf.keras.Input(name=f"{col}__values", dtype=tf.int64, shape=(1,)), - tf.keras.Input(name=f"{col}__nnzs", dtype=tf.int64, shape=(1,)), + tf.keras.Input(name=f"{col}__lengths", dtype=tf.int64, shape=(1,)), ) for col in CATEGORICAL_COLUMNS + CATEGORICAL_MH_COLUMNS: emb_layers.append( diff --git a/tests/unit/dataloader/test_tf_dataloader.py b/tests/unit/dataloader/test_tf_dataloader.py index 3e82e6fe..f70b7823 100644 --- a/tests/unit/dataloader/test_tf_dataloader.py +++ b/tests/unit/dataloader/test_tf_dataloader.py @@ -382,7 +382,7 @@ def test_mh_support(tmpdir, multihot_data, multihot_dataset, batch_size): batch_size=batch_size, shuffle=False, ) - nnzs = None + row_lengths = None idx = 0 for X, y in data_itr: @@ -391,18 +391,18 @@ def test_mh_support(tmpdir, multihot_data, multihot_dataset, batch_size): for mh_name in ["Authors", "Reviewers", "Embedding"]: # assert (mh_name) in X - array, nnzs = X[mh_name] - nnzs = nnzs.numpy()[:, 0] + array, row_lengths = X[mh_name] + row_lengths = row_lengths.numpy()[:, 0] array = array.numpy()[:, 0] if mh_name == "Embedding": - assert (nnzs == 3).all() + assert (row_lengths == 3).all() else: lens = [ len(x) for x in multihot_data[mh_name][idx * batch_size : idx * batch_size + n_samples] ] - assert (nnzs == np.array(lens)).all() + assert (row_lengths == np.array(lens)).all() if mh_name == "Embedding": assert len(array) == (n_samples * 3) @@ -533,7 +533,7 @@ def test_sparse_tensors(tmpdir, sparse_dense): for batch in data_itr: feats, labs = batch for col in spa_lst: - # grab nnzs + # grab row lengths feature_tensor = feats[f"{col}"] if not sparse_dense: assert list(feature_tensor.shape) == [batch_size, spa_mx[col]] @@ -648,7 +648,6 @@ def test_dataloader_schema(tmpdir, dataset, batch_size, cpu): batch_size=batch_size, shuffle=False, ) as data_loader: - batch = data_loader.peek() columns = set(dataset.schema.column_names) - {"label"}