Skip to content

Commit

Permalink
Replace nnzs with row_lengths for clarity (#99)
Browse files Browse the repository at this point in the history
* Replace `nnzs` with `row_lengths` for clarity

The term `nnzs` (i.e. number of non-zeros) comes from sparse tensor nomenclature, but sparse tensors are an implementation detail of the dataloaders, not a domain concept we should be propagating throughout our code base. Instead, let's call them `row_lengths` which make sense as part of a ragged tensor representation throughout Merlin.

* Autoupdate precommit package versions

* Apply auto-formatting to appease the linter
  • Loading branch information
karlhigley authored Feb 13, 2023
1 parent 2aeb862 commit 02aad21
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 41 deletions.
14 changes: 7 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,30 @@ repos:
hooks:
- id: absolufy-imports
- repo: https://github.com/timothycrosley/isort
rev: 5.10.1
rev: 5.12.0
hooks:
- id: isort
additional_dependencies: [toml]
exclude: examples/.*
# code style
- repo: https://github.com/python/black
rev: 22.6.0
rev: 23.1.0
hooks:
- id: black
- repo: https://github.com/pycqa/pylint
rev: v2.14.1
rev: v2.16.1
hooks:
- id: pylint
- repo: https://github.com/pycqa/flake8
rev: 3.9.2
rev: 6.0.0
hooks:
- id: flake8
- repo: https://github.com/adrienverge/yamllint
rev: v1.28.0
rev: v1.29.0
hooks:
- id: yamllint
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v2.7.1
rev: v3.0.0-alpha.4
hooks:
- id: prettier
types_or: [yaml, markdown]
Expand All @@ -46,7 +46,7 @@ repos:
exclude: ^(docs|examples|tests|setup.py|versioneer.py)
args: [--config=pyproject.toml]
- repo: https://github.com/codespell-project/codespell
rev: v2.2.1
rev: v2.2.2
hooks:
- id: codespell
exclude: .github/.*
Expand Down
50 changes: 25 additions & 25 deletions merlin/dataloader/loader_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _num_steps(num_samples, step_size):
class LoaderBase:
"""Base class containing common functionality between the PyTorch and TensorFlow dataloaders."""

_use_nnz = False
_use_row_lengths = False

def __init__(
self,
Expand Down Expand Up @@ -337,15 +337,15 @@ def _get_next_batch(self):
return batch

@annotate("make_tensors", color="darkgreen", domain="merlin_dataloader")
def make_tensors(self, gdf, use_nnz=False):
def make_tensors(self, gdf, use_row_lengths=False):
"""Turns a gdf into tensor representation by column
Parameters
----------
gdf : DataFrame
A dataframe type object.
use_nnz : bool, optional
toggle nnzs or use offsets for list columns, by default False
use_row_lengths : bool, optional
Enable using row lengths instead of offsets for list columns, by default False
Returns
-------
Expand All @@ -357,7 +357,7 @@ def make_tensors(self, gdf, use_nnz=False):
# map from big chunk to framework-specific tensors
chunks, names = self._create_tensors(gdf)

# if we have any offsets, calculate nnzs up front
# if we have any offsets, calculate row lengths up front
# will need to get offsets if list columns detected in schema

# if len(chunks) == 4:
Expand All @@ -368,8 +368,8 @@ def make_tensors(self, gdf, use_nnz=False):
]
if len(lists_list) > 0:
offsets = chunks[-1]
if use_nnz:
nnzs = offsets[1:] - offsets[:-1]
if use_row_lengths:
row_lengths = offsets[1:] - offsets[:-1]
chunks = chunks[:-1]

# split them into batches and map to the framework-specific output format
Expand All @@ -388,43 +388,43 @@ def make_tensors(self, gdf, use_nnz=False):
if lists is not None:
num_list_columns = len(lists)

# grab the set of offsets and nnzs corresponding to
# the list columns from this chunk
# grab the set of offsets and row lengths
# corresponding to the list columns from this chunk
chunk_offsets = offsets[:, offset_idx : offset_idx + num_list_columns]
if use_nnz:
chunk_nnzs = nnzs[:, offset_idx : offset_idx + num_list_columns]
if use_row_lengths:
chunk_row_lengths = row_lengths[:, offset_idx : offset_idx + num_list_columns]
offset_idx += num_list_columns

# split them into batches, including an extra 1 on the offsets
# so we know how long the very last element is
batch_offsets = self._split_fn(chunk_offsets, split_idx + [1])
if use_nnz and len(split_idx) > 1:
batch_nnzs = self._split_fn(chunk_nnzs, split_idx)
elif use_nnz:
batch_nnzs = [chunk_nnzs]
if use_row_lengths and len(split_idx) > 1:
batch_row_lengths = self._split_fn(chunk_row_lengths, split_idx)
elif use_row_lengths:
batch_row_lengths = [chunk_row_lengths]
else:
batch_nnzs = [None] * (len(batch_offsets) - 1)
batch_row_lengths = [None] * (len(batch_offsets) - 1)

# group all these indices together and iterate through
# them in batches to grab the proper elements from each
# values tensor
chunk = zip(chunk, batch_offsets[:-1], batch_offsets[1:], batch_nnzs)
chunk = zip(chunk, batch_offsets[:-1], batch_offsets[1:], batch_row_lengths)

for n, c in enumerate(chunk):
if isinstance(c, tuple):
c, off0s, off1s, _nnzs = c
c, off0s, off1s, _row_lengths = c
offsets_split_idx = [1 for _ in range(num_list_columns)]
off0s = self._split_fn(off0s, offsets_split_idx, axis=1)
off1s = self._split_fn(off1s, offsets_split_idx, axis=1)
if use_nnz:
_nnzs = self._split_fn(_nnzs, offsets_split_idx, axis=1)
if use_row_lengths:
_row_lengths = self._split_fn(_row_lengths, offsets_split_idx, axis=1)

# TODO: does this need to be ordereddict?
batch_lists = {}
for k, (column_name, values) in enumerate(lists.items()):
off0, off1 = off0s[k], off1s[k]
if use_nnz:
nnz = _nnzs[k]
if use_row_lengths:
row_length = _row_lengths[k]

# need to grab scalars for TF case
if len(off0.shape) == 1:
Expand All @@ -435,7 +435,7 @@ def make_tensors(self, gdf, use_nnz=False):
print(off0, off1)
raise ValueError
value = values[int(start) : int(stop)]
index = off0 - start if not use_nnz else nnz
index = off0 - start if not use_row_lengths else row_length
batch_lists[column_name] = (value, index)
c = (c, batch_lists)

Expand Down Expand Up @@ -829,7 +829,7 @@ def chunk_logic(self, itr):
chunks = shuffle_df(chunks)

if len(chunks) > 0:
chunks = self.dataloader.make_tensors(chunks, self.dataloader._use_nnz)
chunks = self.dataloader.make_tensors(chunks, self.dataloader._use_row_lengths)
# put returns True if buffer is stopped before
# packet can be put in queue. Keeps us from
# freezing on a put on a full queue
Expand All @@ -838,7 +838,7 @@ def chunk_logic(self, itr):
chunks = None
# takes care final batch, which is less than batch size
if not self.dataloader.drop_last and spill is not None and not spill.empty:
spill = self.dataloader.make_tensors(spill, self.dataloader._use_nnz)
spill = self.dataloader.make_tensors(spill, self.dataloader._use_row_lengths)
self.put(spill)

@annotate("load_chunks", color="darkgreen", domain="merlin_dataloader")
Expand Down
2 changes: 1 addition & 1 deletion merlin/dataloader/tensorflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class Loader(tf.keras.utils.Sequence, LoaderBase):
will usually contain fewer rows.
"""

_use_nnz = True
_use_row_lengths = True

def __init__(
self,
Expand Down
2 changes: 1 addition & 1 deletion merlin/dataloader/utils/tf/tf_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def seed_fn():
for col in CATEGORICAL_MH_COLUMNS:
inputs[col] = (
tf.keras.Input(name=f"{col}__values", dtype=tf.int64, shape=(1,)),
tf.keras.Input(name=f"{col}__nnzs", dtype=tf.int64, shape=(1,)),
tf.keras.Input(name=f"{col}__lengths", dtype=tf.int64, shape=(1,)),
)
for col in CATEGORICAL_COLUMNS + CATEGORICAL_MH_COLUMNS:
emb_layers.append(
Expand Down
13 changes: 6 additions & 7 deletions tests/unit/dataloader/test_tf_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def test_mh_support(tmpdir, multihot_data, multihot_dataset, batch_size):
batch_size=batch_size,
shuffle=False,
)
nnzs = None
row_lengths = None
idx = 0

for X, y in data_itr:
Expand All @@ -391,18 +391,18 @@ def test_mh_support(tmpdir, multihot_data, multihot_dataset, batch_size):

for mh_name in ["Authors", "Reviewers", "Embedding"]:
# assert (mh_name) in X
array, nnzs = X[mh_name]
nnzs = nnzs.numpy()[:, 0]
array, row_lengths = X[mh_name]
row_lengths = row_lengths.numpy()[:, 0]
array = array.numpy()[:, 0]

if mh_name == "Embedding":
assert (nnzs == 3).all()
assert (row_lengths == 3).all()
else:
lens = [
len(x)
for x in multihot_data[mh_name][idx * batch_size : idx * batch_size + n_samples]
]
assert (nnzs == np.array(lens)).all()
assert (row_lengths == np.array(lens)).all()

if mh_name == "Embedding":
assert len(array) == (n_samples * 3)
Expand Down Expand Up @@ -533,7 +533,7 @@ def test_sparse_tensors(tmpdir, sparse_dense):
for batch in data_itr:
feats, labs = batch
for col in spa_lst:
# grab nnzs
# grab row lengths
feature_tensor = feats[f"{col}"]
if not sparse_dense:
assert list(feature_tensor.shape) == [batch_size, spa_mx[col]]
Expand Down Expand Up @@ -648,7 +648,6 @@ def test_dataloader_schema(tmpdir, dataset, batch_size, cpu):
batch_size=batch_size,
shuffle=False,
) as data_loader:

batch = data_loader.peek()

columns = set(dataset.schema.column_names) - {"label"}
Expand Down

0 comments on commit 02aad21

Please # to comment.