From c6769c24ffe0503439efd12b1523af8d4acdb804 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Tue, 24 Dec 2024 14:09:47 +0100 Subject: [PATCH] perf: Don't always take all data buffers when gathering views (#20435) --- .../src/chunked_array/gather/chunked.rs | 336 ++++++++++-------- 1 file changed, 196 insertions(+), 140 deletions(-) diff --git a/crates/polars-ops/src/chunked_array/gather/chunked.rs b/crates/polars-ops/src/chunked_array/gather/chunked.rs index 1a83cdcfdfef..644e1ac4636d 100644 --- a/crates/polars-ops/src/chunked_array/gather/chunked.rs +++ b/crates/polars-ops/src/chunked_array/gather/chunked.rs @@ -1,13 +1,15 @@ use std::borrow::Cow; use std::fmt::Debug; -use arrow::array::{Array, BinaryViewArray, View}; -use arrow::bitmap::MutableBitmap; +use arrow::array::{Array, BinaryViewArrayGeneric, View, ViewType}; +use arrow::bitmap::BitmapBuilder; use arrow::buffer::Buffer; use arrow::legacy::trusted_len::TrustedLenPush; +use hashbrown::hash_map::Entry; use polars_core::prelude::gather::_update_gather_sorted_flag; use polars_core::prelude::*; use polars_core::series::IsSorted; +use polars_core::utils::Container; use polars_core::with_match_physical_numeric_polars_type; use crate::frame::IntoDf; @@ -147,14 +149,11 @@ impl TakeChunked for Series { }, Binary => { let ca = phys.binary().unwrap(); - let out = take_unchecked_binview(ca, by, sorted); - out.into_series() + take_unchecked_binview(ca, by, sorted).into_series() }, String => { let ca = phys.str().unwrap(); - let ca = ca.as_binary(); - let out = take_unchecked_binview(&ca, by, sorted); - out.to_string_unchecked().into_series() + take_unchecked_binview(ca, by, sorted).into_series() }, List(_) => { let ca = phys.list().unwrap(); @@ -204,14 +203,11 @@ impl TakeChunked for Series { }, Binary => { let ca = phys.binary().unwrap(); - let out = take_unchecked_binview_opt(ca, by); - out.into_series() + take_unchecked_binview_opt(ca, by).into_series() }, String => { let ca = phys.str().unwrap(); - let ca = ca.as_binary(); - let out = take_unchecked_binview_opt(&ca, by); - out.to_string_unchecked().into_series() + take_unchecked_binview_opt(ca, by).into_series() }, List(_) => { let ca = phys.list().unwrap(); @@ -370,174 +366,234 @@ unsafe fn take_opt_unchecked_object(s: &Series, by: &[ChunkId]) builder.to_series() } -#[allow(clippy::unnecessary_cast)] -#[inline(always)] -unsafe fn rewrite_view(mut view: View, chunk_idx: IdxSize, buffer_offsets: &[u32]) -> View { +unsafe fn update_view( + mut view: View, + orig_buffers: &[Buffer], + buffer_idxs: &mut PlHashMap<(*const u8, usize), u32>, + buffers: &mut Vec>, +) -> View { if view.length > 12 { - let base_offset = *buffer_offsets.get_unchecked(chunk_idx as usize); - view.buffer_idx += base_offset; + // Dedup on pointer + length. + let orig_buffer = orig_buffers.get_unchecked(view.buffer_idx as usize); + view.buffer_idx = + match buffer_idxs.entry((orig_buffer.as_slice().as_ptr(), orig_buffer.len())) { + Entry::Occupied(o) => *o.get(), + Entry::Vacant(v) => { + let buffer_idx = buffers.len() as u32; + buffers.push(orig_buffer.clone()); + v.insert(buffer_idx); + buffer_idx + }, + }; } view } -fn create_buffer_offsets(ca: &BinaryChunked) -> Vec { - let mut buffer_offsets = Vec::with_capacity(ca.chunks().len() + 1); - let mut cumsum = 0u32; - buffer_offsets.push(cumsum); - buffer_offsets.extend(ca.downcast_iter().map(|arr| { - cumsum += arr.data_buffers().len() as u32; - cumsum - })); - buffer_offsets -} - -#[allow(clippy::unnecessary_cast)] -unsafe fn take_unchecked_binview( - ca: &BinaryChunked, +unsafe fn take_unchecked_binview( + ca: &ChunkedArray, by: &[ChunkId], sorted: IsSorted, -) -> BinaryChunked { - let views = ca - .downcast_iter() - .map(|arr| arr.views().as_slice()) - .collect::>(); - let buffer_offsets = create_buffer_offsets(ca); - - let buffers: Arc<[Buffer]> = ca - .downcast_iter() - .flat_map(|arr| arr.data_buffers().as_ref()) - .cloned() - .collect(); - - let (views, validity) = if ca.null_count() == 0 { - let views = by - .iter() - .map(|chunk_id| { - let (chunk_idx, array_idx) = chunk_id.extract(); - let array_idx = array_idx as usize; - - let target = *views.get_unchecked(chunk_idx as usize); - let view = *target.get_unchecked(array_idx); - rewrite_view(view, chunk_idx, &buffer_offsets) - }) - .collect::>(); +) -> ChunkedArray +where + T: PolarsDataType>, + V: ViewType + ?Sized, +{ + let mut views = Vec::with_capacity(by.len()); + let (validity, arc_data_buffers); + + // If we can cheaply clone the list of buffers from the ChunkedArray we will, + // otherwise we will only clone those buffers we need. + if ca.n_chunks() == 1 { + let arr = ca.downcast_iter().next().unwrap(); + let arr_views = arr.views(); + + validity = if arr.has_nulls() { + let mut validity = BitmapBuilder::with_capacity(by.len()); + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); + debug_assert!(chunk_idx == 0); + if arr.is_null_unchecked(array_idx as usize) { + views.push_unchecked(View::default()); + validity.push_unchecked(false); + } else { + views.push_unchecked(*arr_views.get_unchecked(array_idx as usize)); + validity.push_unchecked(true); + } + } + Some(validity.freeze()) + } else { + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); + debug_assert!(chunk_idx == 0); + views.push_unchecked(*arr_views.get_unchecked(array_idx as usize)); + } + None + }; - (views, None) + arc_data_buffers = arr.data_buffers().clone(); } else { - let targets = ca.downcast_iter().collect::>(); - - let mut mut_views = Vec::with_capacity(by.len()); - let mut validity = MutableBitmap::with_capacity(by.len()); - - for id in by.iter() { - let (chunk_idx, array_idx) = id.extract(); - let array_idx = array_idx as usize; - - let target = *targets.get_unchecked(chunk_idx as usize); - if target.is_null_unchecked(array_idx) { - mut_views.push_unchecked(View::default()); - validity.push_unchecked(false) - } else { - let target = *views.get_unchecked(chunk_idx as usize); - let view = *target.get_unchecked(array_idx); - let view = rewrite_view(view, chunk_idx, &buffer_offsets); - mut_views.push_unchecked(view); - validity.push_unchecked(true) + let mut buffer_idxs = PlHashMap::with_capacity(8); + let mut buffers = Vec::with_capacity(8); + + validity = if ca.has_nulls() { + let mut validity = BitmapBuilder::with_capacity(by.len()); + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); + + let arr = ca.downcast_get_unchecked(chunk_idx as usize); + if arr.is_null_unchecked(array_idx as usize) { + views.push_unchecked(View::default()); + validity.push_unchecked(false); + } else { + let view = *arr.views().get_unchecked(array_idx as usize); + views.push_unchecked(update_view( + view, + arr.data_buffers(), + &mut buffer_idxs, + &mut buffers, + )); + validity.push_unchecked(true); + } } - } + Some(validity.freeze()) + } else { + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); - (mut_views, Some(validity.freeze())) + let arr = ca.downcast_get_unchecked(chunk_idx as usize); + let view = *arr.views().get_unchecked(array_idx as usize); + views.push_unchecked(update_view( + view, + arr.data_buffers(), + &mut buffer_idxs, + &mut buffers, + )); + } + None + }; + + arc_data_buffers = buffers.into(); }; - let arr = BinaryViewArray::new_unchecked_unknown_md( - ArrowDataType::BinaryView, + let arr = BinaryViewArrayGeneric::::new_unchecked_unknown_md( + V::DATA_TYPE, views.into(), - buffers, + arc_data_buffers, validity, None, - ) - .maybe_gc(); + ); - let mut out = BinaryChunked::with_chunk(ca.name().clone(), arr); + let mut out = ChunkedArray::with_chunk(ca.name().clone(), arr.maybe_gc()); let sorted_flag = _update_gather_sorted_flag(ca.is_sorted_flag(), sorted); out.set_sorted_flag(sorted_flag); out } -unsafe fn take_unchecked_binview_opt( - ca: &BinaryChunked, +unsafe fn take_unchecked_binview_opt( + ca: &ChunkedArray, by: &[ChunkId], -) -> BinaryChunked { - let views = ca - .downcast_iter() - .map(|arr| arr.views().as_slice()) - .collect::>(); - let buffers: Arc<[Buffer]> = ca - .downcast_iter() - .flat_map(|arr| arr.data_buffers().as_ref()) - .cloned() - .collect(); - let buffer_offsets = create_buffer_offsets(ca); - - let targets = ca.downcast_iter().collect::>(); - - let mut mut_views = Vec::with_capacity(by.len()); - let mut validity = MutableBitmap::with_capacity(by.len()); - - let (views, validity) = if ca.null_count() == 0 { - for id in by.iter() { - if id.is_null() { - mut_views.push_unchecked(View::default()); - validity.push_unchecked(false) - } else { - let (chunk_idx, array_idx) = id.extract(); - let array_idx = array_idx as usize; +) -> ChunkedArray +where + T: PolarsDataType>, + V: ViewType + ?Sized, +{ + let mut views = Vec::with_capacity(by.len()); + let mut validity = BitmapBuilder::with_capacity(by.len()); - let target = *views.get_unchecked(chunk_idx as usize); - let view = *target.get_unchecked(array_idx); - let view = rewrite_view(view, chunk_idx, &buffer_offsets); + // If we can cheaply clone the list of buffers from the ChunkedArray we will, + // otherwise we will only clone those buffers we need. + let arc_data_buffers = if ca.n_chunks() == 1 { + let arr = ca.downcast_iter().next().unwrap(); + let arr_views = arr.views(); - mut_views.push_unchecked(view); - validity.push_unchecked(true) + if arr.has_nulls() { + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); + debug_assert!(id.is_null() || chunk_idx == 0); + if id.is_null() || arr.is_null_unchecked(array_idx as usize) { + views.push_unchecked(View::default()); + validity.push_unchecked(false); + } else { + views.push_unchecked(*arr_views.get_unchecked(array_idx as usize)); + validity.push_unchecked(true); + } + } + } else { + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); + debug_assert!(id.is_null() || chunk_idx == 0); + if id.is_null() { + views.push_unchecked(View::default()); + validity.push_unchecked(false); + } else { + views.push_unchecked(*arr_views.get_unchecked(array_idx as usize)); + validity.push_unchecked(true); + } } } - (mut_views, Some(validity.freeze())) + + arr.data_buffers().clone() } else { - for id in by.iter() { - if id.is_null() { - mut_views.push_unchecked(View::default()); - validity.push_unchecked(false) - } else { + let mut buffer_idxs = PlHashMap::with_capacity(8); + let mut buffers = Vec::with_capacity(8); + + if ca.has_nulls() { + for id in by.iter() { let (chunk_idx, array_idx) = id.extract(); - let array_idx = array_idx as usize; - let target = *targets.get_unchecked(chunk_idx as usize); - if target.is_null_unchecked(array_idx) { - mut_views.push_unchecked(View::default()); - validity.push_unchecked(false) + if id.is_null() { + views.push_unchecked(View::default()); + validity.push_unchecked(false); } else { - let target = *views.get_unchecked(chunk_idx as usize); - let view = *target.get_unchecked(array_idx); - let view = rewrite_view(view, chunk_idx, &buffer_offsets); - mut_views.push_unchecked(view); + let arr = ca.downcast_get_unchecked(chunk_idx as usize); + if arr.is_null_unchecked(array_idx as usize) { + views.push_unchecked(View::default()); + validity.push_unchecked(false); + } else { + let view = *arr.views().get_unchecked(array_idx as usize); + views.push_unchecked(update_view( + view, + arr.data_buffers(), + &mut buffer_idxs, + &mut buffers, + )); + validity.push_unchecked(true); + } + } + } + } else { + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); + + if id.is_null() { + views.push_unchecked(View::default()); + validity.push_unchecked(false); + } else { + let arr = ca.downcast_get_unchecked(chunk_idx as usize); + let view = *arr.views().get_unchecked(array_idx as usize); + views.push_unchecked(update_view( + view, + arr.data_buffers(), + &mut buffer_idxs, + &mut buffers, + )); validity.push_unchecked(true); } } - } + }; - (mut_views, Some(validity.freeze())) + buffers.into() }; - let arr = BinaryViewArray::new_unchecked_unknown_md( - ArrowDataType::BinaryView, + let arr = BinaryViewArrayGeneric::::new_unchecked_unknown_md( + V::DATA_TYPE, views.into(), - buffers, - validity, + arc_data_buffers, + Some(validity.freeze()), None, - ) - .maybe_gc(); + ); - BinaryChunked::with_chunk(ca.name().clone(), arr) + ChunkedArray::with_chunk(ca.name().clone(), arr.maybe_gc()) } #[cfg(test)]