Skip to content

ARROW-11291: [Rust] Add extend to MutableBuffer (-20% for arithmetic, -97% for length) #9235

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
wants to merge 11 commits into from
15 changes: 15 additions & 0 deletions rust/arrow/benches/buffer_create.rs
Original file line number Diff line number Diff line change
@@ -39,6 +39,17 @@ fn mutable_buffer(data: &[Vec<u32>], capacity: usize) -> Buffer {
})
}

fn mutable_buffer_extend(data: &[Vec<u32>], capacity: usize) -> Buffer {
criterion::black_box({
let mut result = MutableBuffer::new(capacity);

data.iter()
.for_each(|vec| result.extend(vec.iter().copied()));

result.into()
})
}

fn from_slice(data: &[Vec<u32>], capacity: usize) -> Buffer {
criterion::black_box({
let mut a = Vec::<u32>::with_capacity(capacity);
@@ -72,6 +83,10 @@ fn benchmark(c: &mut Criterion) {

c.bench_function("mutable", |b| b.iter(|| mutable_buffer(&data, 0)));

c.bench_function("mutable extend", |b| {
b.iter(|| mutable_buffer_extend(&data, 0))
});

c.bench_function("mutable prepared", |b| {
b.iter(|| mutable_buffer(&data, byte_cap))
});
14 changes: 6 additions & 8 deletions rust/arrow/benches/length_kernel.rs
Original file line number Diff line number Diff line change
@@ -24,25 +24,23 @@ extern crate arrow;
use arrow::array::*;
use arrow::compute::kernels::length::length;

fn bench_length() {
fn bench_length(array: &StringArray) {
criterion::black_box(length(array).unwrap());
}

fn add_benchmark(c: &mut Criterion) {
fn double_vec<T: Clone>(v: Vec<T>) -> Vec<T> {
[&v[..], &v[..]].concat()
}

// double ["hello", " ", "world", "!"] 10 times
let mut values = vec!["one", "on", "o", ""];
let mut expected = vec![3, 2, 1, 0];
for _ in 0..10 {
values = double_vec(values);
expected = double_vec(expected);
}
let array = StringArray::from(values);

criterion::black_box(length(&array).unwrap());
}

fn add_benchmark(c: &mut Criterion) {
c.bench_function("length", |b| b.iter(bench_length));
c.bench_function("length", |b| b.iter(|| bench_length(&array)));
}

criterion_group!(benches, add_benchmark);
4 changes: 2 additions & 2 deletions rust/arrow/src/array/transform/fixed_binary.rs
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend {
let bytes = &values[i * size..(i + 1) * size];
values_buffer.extend_from_slice(bytes);
} else {
values_buffer.extend(size);
values_buffer.extend_zeros(size);
}
})
},
@@ -61,5 +61,5 @@ pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
};

let values_buffer = &mut mutable.buffer1;
values_buffer.extend(len * size);
values_buffer.extend_zeros(len * size);
}
2 changes: 1 addition & 1 deletion rust/arrow/src/array/transform/primitive.rs
Original file line number Diff line number Diff line change
@@ -36,5 +36,5 @@ pub(super) fn extend_nulls<T: ArrowNativeType>(
mutable: &mut _MutableArrayData,
len: usize,
) {
mutable.buffer1.extend(len * size_of::<T>());
mutable.buffer1.extend_zeros(len * size_of::<T>());
}
230 changes: 219 additions & 11 deletions rust/arrow/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -23,19 +23,19 @@ use packed_simd::u8x64;

use crate::{
bytes::{Bytes, Deallocation},
datatypes::ToByteSlice,
datatypes::{ArrowNativeType, ToByteSlice},
ffi,
};

use std::convert::AsRef;
use std::fmt::Debug;
use std::iter::FromIterator;
use std::ops::{BitAnd, BitOr, Not};
use std::ptr::NonNull;
use std::sync::Arc;
use std::{convert::AsRef, usize};

#[cfg(feature = "avx512")]
use crate::arch::avx512::*;
use crate::datatypes::ArrowNativeType;
use crate::error::{ArrowError, Result};
use crate::memory;
use crate::util::bit_chunk_iterator::BitChunks;
@@ -697,6 +697,7 @@ unsafe impl Sync for Buffer {}
unsafe impl Send for Buffer {}

impl From<MutableBuffer> for Buffer {
#[inline]
fn from(buffer: MutableBuffer) -> Self {
buffer.into_buffer()
}
@@ -727,13 +728,14 @@ pub struct MutableBuffer {

impl MutableBuffer {
/// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
#[inline]
pub fn new(capacity: usize) -> Self {
let new_capacity = bit_util::round_upto_multiple_of_64(capacity);
let ptr = memory::allocate_aligned(new_capacity);
let capacity = bit_util::round_upto_multiple_of_64(capacity);
let ptr = memory::allocate_aligned(capacity);
Self {
data: ptr,
len: 0,
capacity: new_capacity,
capacity,
}
}

@@ -810,10 +812,14 @@ impl MutableBuffer {
pub fn reserve(&mut self, additional: usize) {
let required_cap = self.len + additional;
if required_cap > self.capacity {
let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
let new_capacity = std::cmp::max(new_capacity, self.capacity * 2);
self.data =
unsafe { memory::reallocate(self.data, self.capacity, new_capacity) };
// JUSTIFICATION
// Benefit
// necessity
// Soundness
// `self.data` is valid for `self.capacity`.
let (ptr, new_capacity) =
unsafe { reallocate(self.data, self.capacity, required_cap) };
self.data = ptr;
self.capacity = new_capacity;
}
}
@@ -899,6 +905,7 @@ impl MutableBuffer {
self.into_buffer()
}

#[inline]
fn into_buffer(self) -> Buffer {
let buffer_data = unsafe {
Bytes::new(self.data, self.len, Deallocation::Native(self.capacity))
@@ -963,11 +970,167 @@ impl MutableBuffer {

/// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
#[inline]
pub fn extend(&mut self, additional: usize) {
pub fn extend_zeros(&mut self, additional: usize) {
self.resize(self.len + additional, 0);
}
}

/// # Safety
/// `ptr` must be allocated for `old_capacity`.
#[inline]
unsafe fn reallocate(
ptr: NonNull<u8>,
old_capacity: usize,
new_capacity: usize,
) -> (NonNull<u8>, usize) {
let new_capacity = bit_util::round_upto_multiple_of_64(new_capacity);
let new_capacity = std::cmp::max(new_capacity, old_capacity * 2);
let ptr = memory::reallocate(ptr, old_capacity, new_capacity);
(ptr, new_capacity)
}

impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
#[inline]
fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
let iterator = iter.into_iter();
self.extend_from_iter(iterator)
}
}

impl MutableBuffer {
#[inline]
fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
&mut self,
mut iterator: I,
) {
let size = std::mem::size_of::<T>();
let (lower, _) = iterator.size_hint();
let additional = lower * size;
self.reserve(additional);

// this is necessary because of https://github.com/rust-lang/rust/issues/32155
let mut len = SetLenOnDrop::new(&mut self.len);
let mut dst = unsafe { self.data.as_ptr().add(len.local_len) as *mut T };
let capacity = self.capacity;

while len.local_len + size <= capacity {
if let Some(item) = iterator.next() {
unsafe {
std::ptr::write(dst, item);
dst = dst.add(1);
}
len.local_len += size;
} else {
break;
}
}
drop(len);

iterator.for_each(|item| self.push(item));
}
}

impl Buffer {
/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length.
/// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
/// # Example
/// ```
/// # use arrow::buffer::Buffer;
/// let v = vec![1u32];
/// let iter = v.iter().map(|x| x * 2);
/// let buffer = unsafe { Buffer::from_trusted_len_iter(iter) };
/// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
/// ```
/// # Safety
/// This method assumes that the iterator's size is correct and is undefined behavior
/// to use it on an iterator that reports an incorrect length.
// This implementation is required for two reasons:
// 1. there is no trait `TrustedLen` in stable rust and therefore
// we can't specialize `extend` for `TrustedLen` like `Vec` does.
// 2. `from_trusted_len_iter` is faster.
pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
iterator: I,
) -> Self {
let (_, upper) = iterator.size_hint();
let upper = upper.expect("from_trusted_len_iter requires an upper limit");
let len = upper * std::mem::size_of::<T>();

let mut buffer = MutableBuffer::new(len);

let mut dst = buffer.data.as_ptr() as *mut T;
for item in iterator {
// note how there is no reserve here (compared with `extend_from_iter`)
std::ptr::write(dst, item);
dst = dst.add(1);
}
assert_eq!(
dst.offset_from(buffer.data.as_ptr() as *mut T) as usize,
upper,
"Trusted iterator length was not accurately reported"
);
buffer.len = len;
buffer.into()
}

/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length or errors
/// if any of the items of the iterator is an error.
/// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
/// # Safety
/// This method assumes that the iterator's size is correct and is undefined behavior
/// to use it on an iterator that reports an incorrect length.
pub unsafe fn try_from_trusted_len_iter<
E,
T: ArrowNativeType,
I: Iterator<Item = std::result::Result<T, E>>,
>(
iterator: I,
) -> std::result::Result<Self, E> {
let (_, upper) = iterator.size_hint();
let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
let len = upper * std::mem::size_of::<T>();

let mut buffer = MutableBuffer::new(len);

let mut dst = buffer.data.as_ptr() as *mut T;
for item in iterator {
// note how there is no reserve here (compared with `extend_from_iter`)
std::ptr::write(dst, item?);
dst = dst.add(1);
}
assert_eq!(
dst.offset_from(buffer.data.as_ptr() as *mut T) as usize,
upper,
"Trusted iterator length was not accurately reported"
);
buffer.len = len;
Ok(buffer.into())
}
}

impl<T: ArrowNativeType> FromIterator<T> for Buffer {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
let mut iterator = iter.into_iter();
let size = std::mem::size_of::<T>();

// first iteration, which will likely reserve sufficient space for the buffer.
let mut buffer = match iterator.next() {
None => MutableBuffer::new(0),
Some(element) => {
let (lower, _) = iterator.size_hint();
let mut buffer = MutableBuffer::new(lower.saturating_add(1) * size);
unsafe {
std::ptr::write(buffer.as_mut_ptr() as *mut T, element);
buffer.len = size;
}
buffer
}
};

buffer.extend_from_iter(iterator);
buffer.into()
}
}

impl std::ops::Deref for MutableBuffer {
type Target = [u8];

@@ -1003,6 +1166,28 @@ impl PartialEq for MutableBuffer {
unsafe impl Sync for MutableBuffer {}
unsafe impl Send for MutableBuffer {}

struct SetLenOnDrop<'a> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this construct is fascinating, but I will take your word that it was necessary for speedup

len: &'a mut usize,
local_len: usize,
}

impl<'a> SetLenOnDrop<'a> {
#[inline]
fn new(len: &'a mut usize) -> Self {
SetLenOnDrop {
local_len: *len,
len,
}
}
}

impl Drop for SetLenOnDrop<'_> {
#[inline]
fn drop(&mut self) {
*self.len = self.local_len;
}
}

#[cfg(test)]
mod tests {
use std::thread;
@@ -1172,6 +1357,29 @@ mod tests {
assert_eq!(b"hello arrow", buf.as_slice());
}

#[test]
fn mutable_extend_from_iter() {
let mut buf = MutableBuffer::new(0);
buf.extend(vec![1u32, 2]);
assert_eq!(8, buf.len());
assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());

buf.extend(vec![3u32, 4]);
assert_eq!(16, buf.len());
assert_eq!(
&[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
buf.as_slice()
);
}

#[test]
fn test_from_trusted_len_iter() {
let iter = vec![1u32, 2].into_iter();
let buf = unsafe { Buffer::from_trusted_len_iter(iter) };
assert_eq!(8, buf.len());
assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
}

#[test]
fn test_mutable_reserve() {
let mut buf = MutableBuffer::new(1);
1 change: 1 addition & 0 deletions rust/arrow/src/bytes.rs
Original file line number Diff line number Diff line change
@@ -79,6 +79,7 @@ impl Bytes {
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
#[inline]
pub unsafe fn new(
ptr: std::ptr::NonNull<u8>,
len: usize,
Loading