Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

adding array version of dataloader #111

Merged
merged 24 commits into from
Mar 31, 2023

Conversation

jperez999
Copy link
Collaborator

This PR adds a new data loader that creates numpy (CPU) and cupy tensors (GPU), respective of device.

@jperez999 jperez999 added the enhancement New feature or request label Mar 14, 2023
@jperez999 jperez999 added this to the Merlin 23.03 milestone Mar 14, 2023
@jperez999 jperez999 self-assigned this Mar 14, 2023
merlin/dataloader/array.py Show resolved Hide resolved
merlin/dataloader/array.py Outdated Show resolved Hide resolved
@karlhigley karlhigley modified the milestones: Merlin 23.03, Merlin 23.04 Mar 22, 2023
@karlhigley karlhigley requested a review from edknv March 24, 2023 20:32
@oliverholworthy
Copy link
Member

oliverholworthy commented Mar 27, 2023

I've been looking at comparing load time between the array version and the current version. The array version from the latests commit on this PR appears to take longer to load in most cases. The exception the TensorFlow Loader with scalar features of the same type, which is slower with the current dataloader version.

import random
import time

import cupy
import cudf

from merlin.io import Dataset

def get_dataset(num_rows, *, num_list_features=0, num_int_features=0, num_float_features=0):
    list_features = {
        f"list_{i}": [[random.randint(1, 10) for _ in range(4)] for _ in range(num_rows)]
        for i in range(num_list_features)
    }
    scalar_int_features = {
        f"scalar_int_{i}": cupy.random.randint(1, 10, size=num_rows)
        for i in range(num_int_features)
    }
    scalar_float_features = {
        f"scalar_int_{i}": cupy.random.uniform(size=num_rows)
        for i in range(num_float_features)
    }
    features = {**list_features, **scalar_int_features, **scalar_float_features}
    df = cudf.DataFrame(features)
    return  Dataset(df)


def dataset_load_time(dataset, loader_cls, batch_size):
    start_t = time.time()
    for batch in loader_cls(dataset, batch_size=batch_size):
        pass
    end_t = time.time()
    return end_t - start_t


from merlin.dataloader.tensorflow import Loader as  TFLoader
from merlin.dataloader.torch import Loader as TorchLoader

# Array Versions (PR #111)
from merlin.dataloader.frameworks.torch import TorchArrayDataloader
from merlin.dataloader.frameworks.tensorflow import TFArrayDataloader

# -----------------------------------------------------------------------------
# List Features

dataset = get_dataset(4000, num_list_features=10)
batch_size = 10

print("\n# List Features")

print("\nTensorFlow")
for loader_cls in [TFArrayDataloader, TFLoader]:
    load_time = dataset_load_time(dataset, loader_cls, batch_size=batch_size)
    print(loader_cls.__name__, f"{load_time:.02f} seconds")

print("\nTorch")
for loader_cls in [TorchArrayDataloader, TorchLoader]:
    load_time = dataset_load_time(dataset, loader_cls, batch_size=batch_size)
    print(loader_cls.__name__, f"{load_time:.02f} seconds")

# -----------------------------------------------------------------------------
# Scalar Features

dataset = get_dataset(100_000, num_int_features=10)
batch_size = 10

print("\n# Scalar Features")

print("\nTensorFlow")
for loader_cls in [TFArrayDataloader, TFLoader]:
    load_time = dataset_load_time(dataset, loader_cls, batch_size=batch_size)
    print(loader_cls.__name__, f"{load_time:.02f} seconds")

print("\nTorch")
for loader_cls in [TorchArrayDataloader, TorchLoader]:
    load_time = dataset_load_time(dataset, loader_cls, batch_size=batch_size)
    print(loader_cls.__name__, f"{load_time:.02f} seconds")
# List Features

TensorFlow
TFArrayDataloader 26.07 seconds
Loader 0.89 seconds

Torch
TorchArrayDataloader 5.76 seconds
Loader 0.37 seconds

# Scalar Features

TensorFlow
TFArrayDataloader 4.50 seconds
Loader 12.71 seconds

Torch
TorchArrayDataloader 5.40 seconds
Loader 0.36 seconds

@karlhigley
Copy link
Contributor

Given that the array loader is faster for TF scalar features, I’d guess that we might be hitting issues with the reshapes required to get list features through DLpack? Seems like it’s more than that, but that might be one contributor.

@karlhigley
Copy link
Contributor

karlhigley commented Mar 27, 2023

After profiling with pyinstrument, it looks like TensorColumn's constructor is slow due to the validation of values/offsets and reverse engineering of the shape from them. If I disable those two lines by commenting them out and setting the shape to Shape(), then I get this:

# List Features

Tensorflow
TFArrayDataloader 1.56 seconds
Loader 0.86 seconds

Torch
TorchArrayDataloader 2.10 seconds
Loader 0.39 seconds

# Scalar Features

TensorFlow
TFArrayDataloader 5.64 seconds
Loader 14.75 seconds

Torch
TorchArrayDataloader 7.34 seconds
Loader 0.49 seconds

Profile that led me down that path:
Screenshot-20230327103834-1385x1235

@karlhigley
Copy link
Contributor

If I further disable the dtype conversion and set self._dtype = None, I can get the scalar columns case down to this:

Torch
TorchArrayDataloader 5.72 seconds
Loader 0.54 seconds

But at that point we start to hit a limitation of Torch's DLpack implementation, which grabs the current CUDA stream every time torch.utils.dlpack.from_dlpack() gets called.

@karlhigley
Copy link
Contributor

Turns out that the CUDA initialization is getting counted as part of whichever loader goes first, so I added this line above the profiling:

force_init = th.tensor([]).cuda()

And due to the issue with from_dlpack getting the current stream over and over again, I amended this function from this:

    def _from_dlpack_gpu_to_torch(target_type, array):
        return th.utils.dlpack.from_dlpack(array)

to this:

    def _from_dlpack_gpu_to_torch(target_type, array):
        return th.utils.dlpack.from_dlpack(array.__dlpack__())

which uses the older style DLpack API, bypassing Torch's stream handling code but still seems to work okay.

That gets me to:

Torch
Loader 0.52 seconds
TorchArrayDataloader 2.80 seconds

where the performance difference seems to break down to about 2/3 converting through DLpack and 1/3 creating TensorTable objects.

@karlhigley
Copy link
Contributor

karlhigley commented Mar 27, 2023

Initializing both frameworks ahead of time and using the older PyTorch DLpack API, along with disabling the values/offsets validation, shape computation, and dtype coercion gets me to here:

# List Features

TensorFlow
TFArrayDataloader 1.18 seconds
Loader 0.83 seconds

Torch
TorchArrayDataloader 0.87 seconds
Loader 0.38 seconds

# Scalar Features

TensorFlow
TFArrayDataloader 1.75 seconds
Loader 12.46 seconds

Torch
TorchArrayDataloader 1.84 seconds
Loader 0.41 seconds

The existing TF dataloader seems to be spending a lot of time reshaping scalar columns, which we could probably improve.

@oliverholworthy
Copy link
Member

The existing TF dataloader seems to be spending a lot of time reshaping scalar columns, which we could probably improve.

I've opened a PR #116 to speed up the TF dataloader for scalar columns. (and makes the torch versions slighly faster too. We have an unreleased performance regression that was introduced when we added the reshape recently as part of the output shape change from 2-d to 1-d in #101

@oliverholworthy
Copy link
Member

Using the same script from this comment with the latest version of this branch alongside the latest version of core. I'm still seeing a big difference in loading time between the two versions.

One thing to note, is that a modification is required to make this script work. One way to get it to run is to copy the modified loader_base.py to be used only by the array loader. And reverting the changes to the main loader_base. Otherwise you get AttributeError: EagerTensor object has no attribute 'tolist'.

# List Features

TensorFlow
TFArrayDataloader 13.70 seconds
Loader 0.91 seconds

Torch
TorchArrayDataloader 3.41 seconds
Loader 0.37 seconds

# Scalar Features

TensorFlow
TFArrayDataloader 2.43 seconds
Loader 0.38 seconds

Torch
TorchArrayDataloader 2.08 seconds
Loader 0.15 seconds

@karlhigley
Copy link
Contributor

karlhigley commented Mar 30, 2023

There's still a timing difference because the changes required to take advantage of the dispatching optimizations in Core aren't yet present in this PR

@karlhigley
Copy link
Contributor

karlhigley commented Mar 30, 2023

With this PR, core/pull/264, and the following profiling script, I get these timings:

===== List Features =====

TensorFlow
Loader 2.72 seconds
TFArrayDataloader 0.93 seconds

Torch
Loader 1.11 seconds
TorchArrayDataloader 0.87 seconds

===== Scalar Features =====

TensorFlow
Loader 0.09 seconds
TFArrayDataloader 0.26 seconds

Torch
Loader 0.04 seconds
TorchArrayDataloader 0.20 seconds

===== Mixed Features =====

TensorFlow
Loader 1.27 seconds
TFArrayDataloader 0.59 seconds

Torch
Loader 0.58 seconds
TorchArrayDataloader 0.52 seconds
import random
import time

import cupy
import cudf
import tensorflow as tf
import torch as th

from merlin.io import Dataset

from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()

def get_dataset(num_rows, *, num_list_features=0, num_int_features=0, num_float_features=0):
    list_features = {
        f"list_{i}": [[random.randint(1, 10) for _ in range(4)] for _ in range(num_rows)]
        for i in range(num_list_features)
    }
    scalar_int_features = {
        f"scalar_int_{i}": cupy.random.randint(1, 10, size=num_rows)
        for i in range(num_int_features)
    }
    scalar_float_features = {
        f"scalar_int_{i}": cupy.random.uniform(size=num_rows)
        for i in range(num_float_features)
    }
    features = {**list_features, **scalar_int_features, **scalar_float_features}
    df = cudf.DataFrame(features)
    return  Dataset(df)


def dataset_load_time(dataset, loader_cls, batch_size):
    start_t = time.time()
    for batch in loader_cls(dataset, batch_size=batch_size):
        pass
    end_t = time.time()
    return end_t - start_t


from merlin.dataloader.tensorflow import Loader as  TFLoader
from merlin.dataloader.torch import Loader as TorchLoader

# Array Versions (PR #111)
from merlin.dataloader.frameworks.torch import TorchArrayDataloader
from merlin.dataloader.frameworks.tensorflow import TFArrayDataloader

with tf.device("gpu"):
    tf_force_init = tf.constant([1,2,3])

th_force_init = th.tensor([1,2,3]).cuda()

# -----------------------------------------------------------------------------
# List Features

print("\nList Features\n")

dataset = get_dataset(1_000_000, num_list_features=10)
batch_size = 1000

print("TensorFlow")
for loader_cls in [TFLoader, TFArrayDataloader]:
    load_time = dataset_load_time(dataset, loader_cls, batch_size=batch_size)
    print(loader_cls.__name__, f"{load_time:.02f} seconds")

print("Torch")
for loader_cls in [TorchLoader, TorchArrayDataloader]:
    load_time = dataset_load_time(dataset, loader_cls, batch_size=batch_size)
    print(loader_cls.__name__, f"{load_time:.02f} seconds")

# -----------------------------------------------------------------------------
# Scalar Features

print("\nScalar Features\n")

dataset = get_dataset(1_000_000, num_int_features=10)
batch_size = 1000

print("TensorFlow")
for loader_cls in [TFLoader, TFArrayDataloader]:
    load_time = dataset_load_time(dataset, loader_cls, batch_size=batch_size)
    print(loader_cls.__name__, f"{load_time:.02f} seconds")

print("Torch")
for loader_cls in [TorchLoader, TorchArrayDataloader]:
    load_time = dataset_load_time(dataset, loader_cls, batch_size=batch_size)
    print(loader_cls.__name__, f"{load_time:.02f} seconds")


# -----------------------------------------------------------------------------
# Mixed Features

print("\nMixed Features\n")

dataset = get_dataset(1_000_000, num_list_features=5, num_int_features=5)
batch_size = 1000

print("TensorFlow")
for loader_cls in [TFLoader, TFArrayDataloader, ]:
    load_time = dataset_load_time(dataset, loader_cls, batch_size=batch_size)
    print(loader_cls.__name__, f"{load_time:.02f} seconds")

print("Torch")
for loader_cls in [TorchLoader, TorchArrayDataloader]:
    load_time = dataset_load_time(dataset, loader_cls, batch_size=batch_size)
    print(loader_cls.__name__, f"{load_time:.02f} seconds")

@oliverholworthy
Copy link
Member

I've tried the same and getting similar results.

# List Features
                                                                  
TensorFlow
TFArrayDataloader 1.47 seconds
Loader 0.88 seconds
TFArrayDataloader 0.37 seconds
Loader 0.88 seconds                                                                                                                  
TFArrayDataloader 0.37 seconds                                                                                                       
Loader 0.85 seconds
TFArrayDataloader 0.58 seconds
Loader 0.85 seconds                                                                                                                  
TFArrayDataloader 0.37 seconds                                                                                                       
Loader 0.87 seconds                                                                                                                  
TFArrayDataloader 0.37 seconds
Loader 0.85 seconds                                                                                                                  
TFArrayDataloader 0.37 seconds
Loader 0.86 seconds
TFArrayDataloader 0.59 seconds
Loader 0.85 seconds
TFArrayDataloader 0.36 seconds
Loader 0.85 seconds
TFArrayDataloader 0.37 seconds
Loader 0.84 seconds
                                                                  
Torch                                                                                                                                
TorchArrayDataloader 1.51 seconds                                                                                                    
Loader 0.36 seconds
TorchArrayDataloader 0.57 seconds
Loader 0.36 seconds                                                                                                                  
TorchArrayDataloader 0.35 seconds                                                                                                    
Loader 0.38 seconds                                                                                                                  
TorchArrayDataloader 0.35 seconds
Loader 0.36 seconds
TorchArrayDataloader 0.34 seconds
Loader 0.36 seconds
TorchArrayDataloader 0.52 seconds
Loader 0.36 seconds
TorchArrayDataloader 0.37 seconds
Loader 0.36 seconds
TorchArrayDataloader 0.35 seconds
Loader 0.36 seconds
TorchArrayDataloader 0.35 seconds
Loader 0.36 seconds
TorchArrayDataloader 0.35 seconds
Loader 0.37 seconds

# Scalar Features

TensorFlow
TFArrayDataloader 2.28 seconds
Loader 0.19 seconds
TFArrayDataloader 2.18 seconds
Loader 0.35 seconds
TFArrayDataloader 2.20 seconds
Loader 0.36 seconds
TFArrayDataloader 2.19 seconds
Loader 0.37 seconds
TFArrayDataloader 2.23 seconds
Loader 0.18 seconds
TFArrayDataloader 2.17 seconds
Loader 0.36 seconds
TFArrayDataloader 2.15 seconds
Loader 0.35 seconds
TFArrayDataloader 2.19 seconds
Loader 0.37 seconds
TFArrayDataloader 2.18 seconds
Loader 0.18 seconds
TFArrayDataloader 2.37 seconds
Loader 0.17 seconds

Torch
TorchArrayDataloader 1.69 seconds
Loader 0.34 seconds
TorchArrayDataloader 1.64 seconds
Loader 0.35 seconds
TorchArrayDataloader 1.62 seconds
Loader 0.16 seconds
TorchArrayDataloader 1.81 seconds
Loader 0.16 seconds
TorchArrayDataloader 1.63 seconds
Loader 0.34 seconds
TorchArrayDataloader 1.64 seconds
Loader 0.35 seconds
TorchArrayDataloader 1.64 seconds
Loader 0.16 seconds
TorchArrayDataloader 1.60 seconds
Loader 0.34 seconds
TorchArrayDataloader 1.63 seconds
Loader 0.35 seconds
TorchArrayDataloader 1.62 seconds
Loader 0.16 seconds

@oliverholworthy
Copy link
Member

The last example was with the dataset from the first dataset example run 10 times. One thing that I notice here is that the first time it runs on list features is slower for the array-based implementation. What is the mechanism that causes subsequent runs to load faster? Since this timing includes the instantiation of a new loader each time, is there some global state that is being changed?

@oliverholworthy
Copy link
Member

And could that same mechanism that is responsible for the speed up of list features after the first run have something to do with why the scalar features are 5-10x slower, while with list features, after the first run, load faster than the equivalent current loader?

@karlhigley
Copy link
Contributor

I'm pretty sure the first run takes longer no matter which version of the dataloader you use, because whichever comes first gets attributed the cost of initializing the framework. In the version of the profiling script included above, I addressed that by forcing the frameworks to initialize outside the timing function:

with tf.device("gpu"):
    tf_force_init = tf.constant([1,2,3])

th_force_init = th.tensor([1,2,3]).cuda()

@karlhigley
Copy link
Contributor

Seems like we need the PR that swaps out DictArray for TensorTable to go through first, but the tests on that are failing due to some kind of CuPy/cuDF interoperability issue.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants