Skip to content

Potentially improve join performance by implementing a version of the take kernel that accepts an iterator of indices #13620

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
Tracked by #15885
alihan-synnada opened this issue Dec 2, 2024 · 11 comments
Labels
enhancement New feature or request

Comments

@alihan-synnada
Copy link
Contributor

Is your feature request related to a problem or challenge?

Collecting the filtered indices to a PrimitiveArray takes a lot of memory and time. Using an Iterator based de#stead would save a lot of intermediate memory and potentially speed up the join operation due to fewer cache misses and less copying.

Describe the solution you'd like

Create a version of arrow's compute::take kernel that accepts an iterator of indices. Benchmark and figure out where it's worth using Iterators over PrimitiveArrays.

Describe alternatives you've considered

No response

Additional context

I have tried to create a POC but I seem to get different results whenever I benchmark it and I couldn't figure out what's wrong. I also had lots of trouble with taking the ownership of the values array of PrimitiveArrays which are guaranteed to not have nulls (namely the indices cache and the mask produced by the filter).

Furthermore, it's possible to use the mask's .values().set_indices() iterator to generate indices to be used in the join, because the left-right index pairs are a function of the index of the mask in the form of (index % left_batch.num_rows(), index / left_batch.num_rows()) and it vectorizes nicely.

I plan to share the entirety of my POC (here's a potential implementation of take_with_iter) and benchmarks once I can tame them and generate the same results deterministically.

@alihan-synnada alihan-synnada added the enhancement New feature or request label Dec 2, 2024
@ozankabak
Copy link
Contributor

@alamb I think you might have some useful guidance here. Do you think it makes sense to have something like take_with_iter?

@Dandandan
Copy link
Contributor

Dandandan commented Dec 2, 2024

I wonder if some approach could be taken

  • Between joins: reuse allocations of indices from previous iteration (rather than allocating new one)
  • for filtering indices: rather than collecting to new array, use retain

This avoids allocating new memory but still is compatible with current take.

@alamb
Copy link
Contributor

alamb commented Dec 2, 2024

I agree with @Dandandan -- and specifically it isn't clear to me that an iterator based approach will be faster than using the take kernel -- I suspect the bottleneck will be the copy that is happening as part of take not the actual managment of the indexes

If the issue is that the indices themselves take up too much space, then perhaps we can do some more effort to incrementally generate them and reuse the arrays, as suggested by @Dandandan

Here is an example in grouping where we reuse indexes:

/// scratch space for the current input [`RecordBatch`] being
/// processed. Reused across batches here to avoid reallocations
current_group_indices: Vec<usize>,

@ozankabak
Copy link
Contributor

I think the idea is that there are some cases where indices have a pattern and can be "procedurally generated" (in CV-speak). Is that right, @alihan-synnada?

@alamb
Copy link
Contributor

alamb commented Dec 2, 2024

I think the idea is that there are some cases where indices have a pattern and can be "procedurally generated" (in CV-speak). Is that right, @alihan-synnada?

Ah -- makes sense -- check out how filter does it in arrow-rs:

https://github.com/apache/arrow-rs/blob/b3f9355ae042071ff33b3f957295c6e97639fe2c/arrow-select/src/filter.rs#L281-L293

Maybe we can expose that API somehow 🤔

@comphead
Copy link
Contributor

comphead commented Dec 2, 2024

Related to apache/datafusion-comet#901 (comment)

@Rachelint
Copy link
Contributor

I agree with @Dandandan -- and specifically it isn't clear to me that an iterator based approach will be faster than using the take kernel -- I suspect the bottleneck will be the copy that is happening as part of take not the actual managment of the indexes

If the issue is that the indices themselves take up too much space, then perhaps we can do some more effort to incrementally generate them and reuse the arrays, as suggested by @Dandandan

Here is an example in grouping where we reuse indexes:

/// scratch space for the current input [`RecordBatch`] being
/// processed. Reused across batches here to avoid reallocations
current_group_indices: Vec<usize>,

Agree with take may be the bottleneck, I try take + eq appoach in vectorized compare of primitive in https://github.com/Rachelint/arrow-datafusion/tree/optimize-vectorized-operations-bak

Even try best to reuse the buffer, take still cost much cpu in flamegraph in the new added benchmark also in this pr.

@alihan-synnada
Copy link
Contributor Author

alihan-synnada commented Dec 3, 2024

PoC Link It requires a patched version of arrow-buffer that derives Clone for BitIndexIterator. The benchmark might be misleading because I had trouble with lifetimes and ended up using Box::leak as a last resort.

I'm not very confident in the way I set up the benchmark but I think the results are promising. Note that the selectivity only goes up to 30 because it's really slow after that point. Selectivity is used in the filter as "sum of all columns % 100 = selectivity".

Batch size is in log2 on the chart (i.e. batch size 13 means 2^13 (8192)). So it isn't really useful for the default batch size of 8192 but anything between 2^4 (16) and 2^10 (1024) might benefit from it.

Figure_1

@Dandandan
Copy link
Contributor

AFAIK the algorithm of NLJ is as follows:

  1. Generating indices in a certain pattern
  2. Taking left / right side based on pattern for columns in filter
  3. Creating filter based on expression
  4. Filtering indices based on expression
  5. Taking values of output columns based on filtered indices

Optionally some extra logic per join type is applied (updating visited rows).

I think there is a couple of things that might be optimized in nested loop join:

  • taking values from left side for columns in the filter might be reused between iterations (as this might be the same every time)
  • ideally: we don't create the indices, take them, create a filter and apply the filters, but create the output indices or even the output directly. This would entail writing a specific implementation which, instead of calling the different kernels combines the individual operations just like @Rachelint did for hash aggregates. Possibly take with iterator could help here as it would avoid implementing this bit over and over for each "fused" implementation. I think we need to see an example of this first.

@comphead
Copy link
Contributor

comphead commented Dec 3, 2024

Optionally some extra logic per join type is applied (updating visited rows).

I think there is a couple of things that might be optimized in nested loop join:

  • taking values from left side for columns in the filter might be reused between iterations (as this might be the same every time)
  • ideally: we don't create the indices, take them, create a filter and apply the filters, but create the output indices or even the output directly. This would entail writing a specific implementation which, instead of calling the different kernels combines the individual operations just like @Rachelint did for hash aggregates. Possibly take with iterator could help here as it would avoid implementing this bit over and over for each "fused" implementation. I think we need to see an example of this first.

That makes sense, alternatively there is a good set of NLJ optimizations in this video https://www.youtube.com/watch?v=RcEW0P8iVTc

I went quickly through our NLJ implementation and it looks like a page(batch) based NLJ, and there is some optimizations that can potentially be applied reg to video.

@Dandandan
Copy link
Contributor

Dandandan commented Dec 3, 2024

That makes sense, alternatively there is a good set of NLJ optimizations in this video https://www.youtube.com/watch?v=RcEW0P8iVTc

Yeah, nice! Thanks for bringing this to our attention

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants