Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

perf: Use different binview dedup strategy depending on chunks ratio #20451

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 128 additions & 4 deletions crates/polars-ops/src/chunked_array/gather/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use arrow::array::{Array, BinaryViewArrayGeneric, View, ViewType};
use arrow::bitmap::BitmapBuilder;
use arrow::buffer::Buffer;
use arrow::legacy::trusted_len::TrustedLenPush;
use hashbrown::hash_map::Entry;
use polars_core::prelude::gather::_update_gather_sorted_flag;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
Expand Down Expand Up @@ -472,7 +473,53 @@ where
};

arc_data_buffers = arr.data_buffers().clone();
} else {
}
// Dedup the buffers while creating the views.
else if by.len() < ca.n_chunks() {
let mut buffer_idxs = PlHashMap::with_capacity(8);
let mut buffers = Vec::with_capacity(8);

validity = if ca.has_nulls() {
let mut validity = BitmapBuilder::with_capacity(by.len());
for id in by.iter() {
let (chunk_idx, array_idx) = id.extract();

let arr = ca.downcast_get_unchecked(chunk_idx as usize);
if arr.is_null_unchecked(array_idx as usize) {
views.push_unchecked(View::default());
validity.push_unchecked(false);
} else {
let view = *arr.views().get_unchecked(array_idx as usize);
views.push_unchecked(update_view_and_dedup(
view,
arr.data_buffers(),
&mut buffer_idxs,
&mut buffers,
));
validity.push_unchecked(true);
}
}
Some(validity.freeze())
} else {
for id in by.iter() {
let (chunk_idx, array_idx) = id.extract();

let arr = ca.downcast_get_unchecked(chunk_idx as usize);
let view = *arr.views().get_unchecked(array_idx as usize);
views.push_unchecked(update_view_and_dedup(
view,
arr.data_buffers(),
&mut buffer_idxs,
&mut buffers,
));
}
None
};

arc_data_buffers = buffers.into();
}
// Dedup the buffers up front
else {
let (buffers, buffer_offsets) = dedup_buffers(ca);

validity = if ca.has_nulls() {
Expand Down Expand Up @@ -531,13 +578,36 @@ unsafe fn rewrite_view(mut view: View, chunk_idx: IdxSize, buffer_offsets: &[u32
view
}

unsafe fn update_view_and_dedup(
mut view: View,
orig_buffers: &[Buffer<u8>],
buffer_idxs: &mut PlHashMap<(*const u8, usize), u32>,
buffers: &mut Vec<Buffer<u8>>,
) -> View {
if view.length > 12 {
// Dedup on pointer + length.
let orig_buffer = orig_buffers.get_unchecked(view.buffer_idx as usize);
view.buffer_idx =
match buffer_idxs.entry((orig_buffer.as_slice().as_ptr(), orig_buffer.len())) {
Entry::Occupied(o) => *o.get(),
Entry::Vacant(v) => {
let buffer_idx = buffers.len() as u32;
buffers.push(orig_buffer.clone());
v.insert(buffer_idx);
buffer_idx
},
};
}
view
}

fn dedup_buffers<T, V>(ca: &ChunkedArray<T>) -> (Vec<Buffer<u8>>, Vec<u32>)
where
T: PolarsDataType<Array = BinaryViewArrayGeneric<V>>,
V: ViewType + ?Sized,
{
// Dedup buffers up front. Note: don't do this during view update, as this is much more
// costly.
// Dedup buffers up front. Note: don't do this during view update, as this is often is much
// more costly.
let mut buffers = Vec::with_capacity(ca.chunks().len());
// Dont need to include the length, as we look at the arc pointers, which are immutable.
let mut buffers_dedup = PlHashSet::with_capacity(ca.chunks().len());
Expand Down Expand Up @@ -598,7 +668,61 @@ where
}

arr.data_buffers().clone()
} else {
}
// Dedup the buffers while creating the views.
else if by.len() < ca.n_chunks() {
let mut buffer_idxs = PlHashMap::with_capacity(8);
let mut buffers = Vec::with_capacity(8);

if ca.has_nulls() {
for id in by.iter() {
let (chunk_idx, array_idx) = id.extract();

if id.is_null() {
views.push_unchecked(View::default());
validity.push_unchecked(false);
} else {
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
if arr.is_null_unchecked(array_idx as usize) {
views.push_unchecked(View::default());
validity.push_unchecked(false);
} else {
let view = *arr.views().get_unchecked(array_idx as usize);
views.push_unchecked(update_view_and_dedup(
view,
arr.data_buffers(),
&mut buffer_idxs,
&mut buffers,
));
validity.push_unchecked(true);
}
}
}
} else {
for id in by.iter() {
let (chunk_idx, array_idx) = id.extract();

if id.is_null() {
views.push_unchecked(View::default());
validity.push_unchecked(false);
} else {
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
let view = *arr.views().get_unchecked(array_idx as usize);
views.push_unchecked(update_view_and_dedup(
view,
arr.data_buffers(),
&mut buffer_idxs,
&mut buffers,
));
validity.push_unchecked(true);
}
}
};

buffers.into()
}
// Dedup the buffers up front
else {
let (buffers, buffer_offsets) = dedup_buffers(ca);

if ca.has_nulls() {
Expand Down
Loading