Skip to content

Commit

Permalink
perf: Use different binview dedup strategy depending on chunks ratio (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Dec 26, 2024
1 parent cea65be commit aaacdbe
Showing 1 changed file with 128 additions and 4 deletions.
132 changes: 128 additions & 4 deletions crates/polars-ops/src/chunked_array/gather/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use arrow::array::{Array, BinaryViewArrayGeneric, View, ViewType};
use arrow::bitmap::BitmapBuilder;
use arrow::buffer::Buffer;
use arrow::legacy::trusted_len::TrustedLenPush;
use hashbrown::hash_map::Entry;
use polars_core::prelude::gather::_update_gather_sorted_flag;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
Expand Down Expand Up @@ -472,7 +473,53 @@ where
};

arc_data_buffers = arr.data_buffers().clone();
} else {
}
// Dedup the buffers while creating the views.
else if by.len() < ca.n_chunks() {
let mut buffer_idxs = PlHashMap::with_capacity(8);
let mut buffers = Vec::with_capacity(8);

validity = if ca.has_nulls() {
let mut validity = BitmapBuilder::with_capacity(by.len());
for id in by.iter() {
let (chunk_idx, array_idx) = id.extract();

let arr = ca.downcast_get_unchecked(chunk_idx as usize);
if arr.is_null_unchecked(array_idx as usize) {
views.push_unchecked(View::default());
validity.push_unchecked(false);
} else {
let view = *arr.views().get_unchecked(array_idx as usize);
views.push_unchecked(update_view_and_dedup(
view,
arr.data_buffers(),
&mut buffer_idxs,
&mut buffers,
));
validity.push_unchecked(true);
}
}
Some(validity.freeze())
} else {
for id in by.iter() {
let (chunk_idx, array_idx) = id.extract();

let arr = ca.downcast_get_unchecked(chunk_idx as usize);
let view = *arr.views().get_unchecked(array_idx as usize);
views.push_unchecked(update_view_and_dedup(
view,
arr.data_buffers(),
&mut buffer_idxs,
&mut buffers,
));
}
None
};

arc_data_buffers = buffers.into();
}
// Dedup the buffers up front
else {
let (buffers, buffer_offsets) = dedup_buffers(ca);

validity = if ca.has_nulls() {
Expand Down Expand Up @@ -531,13 +578,36 @@ unsafe fn rewrite_view(mut view: View, chunk_idx: IdxSize, buffer_offsets: &[u32
view
}

unsafe fn update_view_and_dedup(
mut view: View,
orig_buffers: &[Buffer<u8>],
buffer_idxs: &mut PlHashMap<(*const u8, usize), u32>,
buffers: &mut Vec<Buffer<u8>>,
) -> View {
if view.length > 12 {
// Dedup on pointer + length.
let orig_buffer = orig_buffers.get_unchecked(view.buffer_idx as usize);
view.buffer_idx =
match buffer_idxs.entry((orig_buffer.as_slice().as_ptr(), orig_buffer.len())) {
Entry::Occupied(o) => *o.get(),
Entry::Vacant(v) => {
let buffer_idx = buffers.len() as u32;
buffers.push(orig_buffer.clone());
v.insert(buffer_idx);
buffer_idx
},
};
}
view
}

fn dedup_buffers<T, V>(ca: &ChunkedArray<T>) -> (Vec<Buffer<u8>>, Vec<u32>)
where
T: PolarsDataType<Array = BinaryViewArrayGeneric<V>>,
V: ViewType + ?Sized,
{
// Dedup buffers up front. Note: don't do this during view update, as this is much more
// costly.
// Dedup buffers up front. Note: don't do this during view update, as this is often is much
// more costly.
let mut buffers = Vec::with_capacity(ca.chunks().len());
// Dont need to include the length, as we look at the arc pointers, which are immutable.
let mut buffers_dedup = PlHashSet::with_capacity(ca.chunks().len());
Expand Down Expand Up @@ -598,7 +668,61 @@ where
}

arr.data_buffers().clone()
} else {
}
// Dedup the buffers while creating the views.
else if by.len() < ca.n_chunks() {
let mut buffer_idxs = PlHashMap::with_capacity(8);
let mut buffers = Vec::with_capacity(8);

if ca.has_nulls() {
for id in by.iter() {
let (chunk_idx, array_idx) = id.extract();

if id.is_null() {
views.push_unchecked(View::default());
validity.push_unchecked(false);
} else {
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
if arr.is_null_unchecked(array_idx as usize) {
views.push_unchecked(View::default());
validity.push_unchecked(false);
} else {
let view = *arr.views().get_unchecked(array_idx as usize);
views.push_unchecked(update_view_and_dedup(
view,
arr.data_buffers(),
&mut buffer_idxs,
&mut buffers,
));
validity.push_unchecked(true);
}
}
}
} else {
for id in by.iter() {
let (chunk_idx, array_idx) = id.extract();

if id.is_null() {
views.push_unchecked(View::default());
validity.push_unchecked(false);
} else {
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
let view = *arr.views().get_unchecked(array_idx as usize);
views.push_unchecked(update_view_and_dedup(
view,
arr.data_buffers(),
&mut buffer_idxs,
&mut buffers,
));
validity.push_unchecked(true);
}
}
};

buffers.into()
}
// Dedup the buffers up front
else {
let (buffers, buffer_offsets) = dedup_buffers(ca);

if ca.has_nulls() {
Expand Down

0 comments on commit aaacdbe

Please # to comment.