From 8cc9aeabb3bace81949b27727a3562db84182834 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Mon, 10 Feb 2025 21:33:50 +0800 Subject: [PATCH 1/3] Fix double memory allocation caused by collecting the merged batches; Fix batch memory consumption growth after sorting; Reserve memory more aggressively to compensate for memory needed for merging. --- datafusion/core/tests/fuzz_cases/sort_fuzz.rs | 4 +- datafusion/core/tests/memory_limit/mod.rs | 8 +- datafusion/physical-plan/src/sorts/cursor.rs | 22 +- datafusion/physical-plan/src/sorts/sort.rs | 274 ++++++++++++++---- datafusion/physical-plan/src/sorts/stream.rs | 18 +- .../src/sorts/streaming_merge.rs | 3 +- datafusion/physical-plan/src/spill.rs | 10 +- datafusion/physical-plan/src/test.rs | 35 ++- 8 files changed, 304 insertions(+), 70 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index ecc077261acc..3806e1cd20a7 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -54,7 +54,9 @@ async fn test_sort_10k_mem() { #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn test_sort_100k_mem() { - for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, true)] { + for (batch_size, should_spill) in + [(5, false), (10000, false), (20000, true), (1000000, true)] + { SortTest::new() .with_int32_batches(batch_size) .with_pool_size(100 * KB) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 719faed4e454..9a6e4e3a85ee 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -69,7 +69,7 @@ async fn oom_sort() { .with_expected_errors(vec![ "Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)", ]) - .with_memory_limit(200_000) + .with_memory_limit(400_000) .run() .await } @@ -271,7 +271,8 @@ async fn sort_spill_reservation() { // Merge operation needs extra memory to do row conversion, so make the // memory limit larger. - let mem_limit = partition_size * 2; + let mem_limit = + ((partition_size * 2 + 1024) as f64 / MEMORY_FRACTION).ceil() as usize; let test = TestCase::new() // This query uses a different order than the input table to // force a sort. It also needs to have multiple columns to @@ -308,7 +309,8 @@ async fn sort_spill_reservation() { test.clone() .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:", + "bytes for ExternalSorterMerge", ]) .with_config(config) .run() diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index e6986b86046c..8ea7c43d2613 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -291,6 +291,10 @@ pub struct ArrayValues { // Otherwise, the first null index null_threshold: usize, options: SortOptions, + + /// Tracks the memory used by the values array, + /// freed on drop. + _reservation: MemoryReservation, } impl ArrayValues { @@ -298,7 +302,11 @@ impl ArrayValues { /// to `options`. /// /// Panics if the array is empty - pub fn new>(options: SortOptions, array: &A) -> Self { + pub fn new>( + options: SortOptions, + array: &A, + reservation: MemoryReservation, + ) -> Self { assert!(array.len() > 0, "Empty array passed to FieldCursor"); let null_threshold = match options.nulls_first { true => array.null_count(), @@ -309,6 +317,7 @@ impl ArrayValues { values: array.values(), null_threshold, options, + _reservation: reservation, } } @@ -360,6 +369,12 @@ impl CursorValues for ArrayValues { #[cfg(test)] mod tests { + use std::sync::Arc; + + use datafusion_execution::memory_pool::{ + GreedyMemoryPool, MemoryConsumer, MemoryPool, + }; + use super::*; fn new_primitive( @@ -372,10 +387,15 @@ mod tests { false => values.len() - null_count, }; + let memory_pool: Arc = Arc::new(GreedyMemoryPool::new(10000)); + let consumer = MemoryConsumer::new("test"); + let reservation = consumer.register(&memory_pool); + let values = ArrayValues { values: PrimitiveValues(values), null_threshold, options, + _reservation: reservation, }; Cursor::new(values) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 9f7e82f026bd..09bcfea30d53 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -24,7 +24,7 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use crate::common::spawn_buffered; +use crate::common::{spawn_buffered, IPCWriter}; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; @@ -225,8 +225,6 @@ struct ExternalSorter { // ======================================================================== /// Potentially unsorted in memory buffer in_mem_batches: Vec, - /// if `Self::in_mem_batches` are sorted - in_mem_batches_sorted: bool, /// If data has previously been spilled, the locations of the /// spill files (in Arrow IPC format) @@ -279,7 +277,6 @@ impl ExternalSorter { Self { schema, in_mem_batches: vec![], - in_mem_batches_sorted: true, spills: vec![], expr: expr.into(), metrics, @@ -302,31 +299,16 @@ impl ExternalSorter { } self.reserve_memory_for_merge()?; - let size = get_record_batch_memory_size(&input); - + let size = get_reserved_byte_for_record_batch(&input); if self.reservation.try_grow(size).is_err() { - let before = self.reservation.size(); - self.in_mem_sort().await?; - - // Sorting may have freed memory, especially if fetch is `Some` - // - // As such we check again, and if the memory usage has dropped by - // a factor of 2, and we can allocate the necessary capacity, - // we don't spill - // - // The factor of 2 aims to avoid a degenerate case where the - // memory required for `fetch` is just under the memory available, - // causing repeated re-sorting of data - if self.reservation.size() > before / 2 - || self.reservation.try_grow(size).is_err() - { - self.spill().await?; - self.reservation.try_grow(size)? - } + self.sort_or_spill_in_mem_batches().await?; + // We've already freed more than half of reserved memory, + // so we can grow the reservation again. There's nothing we can do + // if this try_grow fails. + self.reservation.try_grow(size)?; } self.in_mem_batches.push(input); - self.in_mem_batches_sorted = false; Ok(()) } @@ -344,6 +326,11 @@ impl ExternalSorter { /// 2. A combined streaming merge incorporating both in-memory /// batches and data from spill files on disk. fn sort(&mut self) -> Result { + // Release the memory reserved for merge back to the pool so + // there is some left when `in_mem_sort_stream` requests an + // allocation. + self.merge_reservation.free(); + if self.spilled_before() { let mut streams = vec![]; if !self.in_mem_batches.is_empty() { @@ -369,7 +356,7 @@ impl ExternalSorter { .with_metrics(self.metrics.baseline.clone()) .with_batch_size(self.batch_size) .with_fetch(self.fetch) - .with_reservation(self.reservation.new_empty()) + .with_reservation(self.merge_reservation.new_empty()) .build() } else { self.in_mem_sort_stream(self.metrics.baseline.clone()) @@ -408,50 +395,114 @@ impl ExternalSorter { debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); - self.in_mem_sort().await?; - let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); - let spilled_rows = spill_record_batches( + let (spilled_rows, spilled_bytes) = spill_record_batches( batches, spill_file.path().into(), Arc::clone(&self.schema), )?; let used = self.reservation.free(); self.metrics.spill_count.add(1); - self.metrics.spilled_bytes.add(used); + self.metrics.spilled_bytes.add(spilled_bytes); self.metrics.spilled_rows.add(spilled_rows); self.spills.push(spill_file); Ok(used) } /// Sorts the in_mem_batches in place - async fn in_mem_sort(&mut self) -> Result<()> { - if self.in_mem_batches_sorted { - return Ok(()); - } - + /// + /// Sorting may have freed memory, especially if fetch is `Some`. If + /// the memory usage has dropped by a factor of 2, then we don't have + /// to spill. Otherwise, we spill to free up memory for inserting + /// more batches. + /// + /// The factor of 2 aims to avoid a degenerate case where the + /// memory required for `fetch` is just under the memory available, + // causing repeated re-sorting of data + async fn sort_or_spill_in_mem_batches(&mut self) -> Result<()> { // Release the memory reserved for merge back to the pool so - // there is some left when `in_memo_sort_stream` requests an + // there is some left when `in_mem_sort_stream` requests an // allocation. self.merge_reservation.free(); - self.in_mem_batches = self - .in_mem_sort_stream(self.metrics.baseline.intermediate())? - .try_collect() - .await?; + let before = self.reservation.size(); + + let mut sorted_stream = + self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + + // `self.in_mem_batches` is already taken away by the sort_stream, now it is empty. + // We'll gradually collect the sorted stream into self.in_mem_batches, or directly + // write sorted batches to disk when the memory is insufficient. + let mut spill_writer: Option = None; + // Leave at least 1/3 of spill reservation for sort/merge the next batch. Here the + // 1/3 is simply an arbitrary chosen number. + let sort_merge_minimum_overhead = self.sort_spill_reservation_bytes / 3; + while let Some(batch) = sorted_stream.next().await { + let batch = batch?; + match &mut spill_writer { + None => { + let sorted_size = get_reserved_byte_for_record_batch(&batch); + + // We reserve more memory to ensure that we'll have enough memory for + // `SortPreservingMergeStream` after consuming this batch, otherwise we'll + // start spilling everything to disk. + if self + .reservation + .try_grow(sorted_size + sort_merge_minimum_overhead) + .is_err() + { + // Directly write in_mem_batches as well as all the remaining batches in + // sorted_stream to disk. Further batches fetched from `sorted_stream` will + // be handled by the `Some(writer)` matching arm. + let spill_file = + self.runtime.disk_manager.create_tmp_file("Sorting")?; + let mut writer = IPCWriter::new(spill_file.path(), &self.schema)?; + // Flush everything in memory to the spill file + for batch in self.in_mem_batches.drain(..) { + writer.write(&batch)?; + } + // as well as the newly sorted batch + writer.write(&batch)?; + spill_writer = Some(writer); + self.reservation.free(); + self.spills.push(spill_file); + } else { + self.in_mem_batches.push(batch); + + // Gives back memory for merging the next batch. + self.reservation.shrink(sort_merge_minimum_overhead); + } + } + Some(writer) => { + writer.write(&batch)?; + } + } + } - let size: usize = self - .in_mem_batches - .iter() - .map(get_record_batch_memory_size) - .sum(); + // Drop early to free up memory reserved by the sorted stream, otherwise the + // upcoming `self.reserve_memory_for_merge()` may fail due to insufficient memory. + drop(sorted_stream); + + if let Some(writer) = &mut spill_writer { + writer.finish()?; + self.metrics.spill_count.add(1); + self.metrics.spilled_rows.add(writer.num_rows); + self.metrics.spilled_bytes.add(writer.num_bytes); + } + + // Sorting may free up some memory especially when fetch is `Some`. If we have + // not freed more than 50% of the memory, then we have to spill to free up more + // memory for inserting more batches. + if spill_writer.is_none() && self.reservation.size() > before / 2 { + // We have not freed more than 50% of the memory, so we have to spill to + // free up more memory + self.spill().await?; + } // Reserve headroom for next sort/merge self.reserve_memory_for_merge()?; - self.reservation.try_resize(size)?; - self.in_mem_batches_sorted = true; Ok(()) } @@ -528,6 +579,12 @@ impl ExternalSorter { let elapsed_compute = metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); + // Please pay attention that any operation inside of `in_mem_sort_stream` will + // not perform any memory reservation. This is for avoiding the need of handling + // reservation failure and spilling in the middle of the sort/merge. The memory + // space for batches produced by the resulting stream will be reserved by the + // consumer of the stream. + if self.in_mem_batches.len() == 1 { let batch = self.in_mem_batches.swap_remove(0); let reservation = self.reservation.take(); @@ -540,7 +597,7 @@ impl ExternalSorter { let batch = concat_batches(&self.schema, &self.in_mem_batches)?; self.in_mem_batches.clear(); self.reservation - .try_resize(get_record_batch_memory_size(&batch))?; + .try_resize(get_reserved_byte_for_record_batch(&batch))?; let reservation = self.reservation.take(); return self.sort_batch_stream(batch, metrics, reservation); } @@ -549,8 +606,9 @@ impl ExternalSorter { .into_iter() .map(|batch| { let metrics = self.metrics.baseline.intermediate(); - let reservation = - self.reservation.split(get_record_batch_memory_size(&batch)); + let reservation = self + .reservation + .split(get_reserved_byte_for_record_batch(&batch)); let input = self.sort_batch_stream(batch, metrics, reservation)?; Ok(spawn_buffered(input, 1)) }) @@ -579,7 +637,10 @@ impl ExternalSorter { metrics: BaselineMetrics, reservation: MemoryReservation, ) -> Result { - assert_eq!(get_record_batch_memory_size(&batch), reservation.size()); + assert_eq!( + get_reserved_byte_for_record_batch(&batch), + reservation.size() + ); let schema = batch.schema(); let fetch = self.fetch; @@ -612,6 +673,20 @@ impl ExternalSorter { } } +/// Estimate how much memory is needed to sort a `RecordBatch`. +/// +/// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves +/// creating sorted copies of sorted columns in record batches, the sorted copies could be +/// in either row format or array format. Please refer to cursor.rs and stream.rs for more +/// details. No matter what format the sorted copies are, they will use more memory than +/// the original record batch. +fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize { + // 2x may not be enough for some cases, but it's a good start. + // If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes` + // to compensate for the extra memory needed. + get_record_batch_memory_size(batch) * 2 +} + impl Debug for ExternalSorter { fn fmt(&self, f: &mut Formatter) -> fmt::Result { f.debug_struct("ExternalSorter") @@ -641,7 +716,15 @@ pub fn sort_batch( lexsort_to_indices(&sort_columns, fetch)? }; - let columns = take_arrays(batch.columns(), &indices, None)?; + let mut columns = take_arrays(batch.columns(), &indices, None)?; + + // The columns may be larger than the unsorted columns in `batch` especially for variable length + // data types due to exponential growth when building the sort columns. We shrink the columns + // to prevent memory reservation failures, as well as excessive memory allocation when running + // merges in `SortPreservingMergeStream`. + columns.iter_mut().for_each(|c| { + c.shrink_to_fit(); + }); let options = RecordBatchOptions::new().with_row_count(Some(indices.len())); Ok(RecordBatch::try_new_with_options( @@ -1246,6 +1329,9 @@ mod tests { .with_runtime(runtime), ); + // The input has 100 partitions, each partition has a batch containing 100 rows. + // Each row has a single Int32 column with values 0..100. The total size of the + // input is roughly 40000 bytes. let partitions = 100; let input = test::scan_partitioned(partitions); let schema = input.schema(); @@ -1271,9 +1357,16 @@ mod tests { assert_eq!(metrics.output_rows().unwrap(), 10000); assert!(metrics.elapsed_compute().unwrap() > 0); - assert_eq!(metrics.spill_count().unwrap(), 3); - assert_eq!(metrics.spilled_bytes().unwrap(), 36000); - assert_eq!(metrics.spilled_rows().unwrap(), 9000); + + let spill_count = metrics.spill_count().unwrap(); + let spilled_rows = metrics.spilled_rows().unwrap(); + let spilled_bytes = metrics.spilled_bytes().unwrap(); + // Processing 40000 bytes of data using 12288 bytes of memory requires 3 spills + // unless we do something really clever. It will spill roughly 9000+ rows and 36000 + // bytes. We leave a little wiggle room for the actual numbers. + assert!((3..=10).contains(&spill_count)); + assert!((9000..=10000).contains(&spilled_rows)); + assert!((36000..=40000).contains(&spilled_bytes)); let columns = result[0].columns(); @@ -1290,6 +1383,77 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_sort_spill_utf8_strings() -> Result<()> { + let session_config = SessionConfig::new() + .with_batch_size(100) + .with_sort_in_place_threshold_bytes(20 * 1024) + .with_sort_spill_reservation_bytes(100 * 1024); + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(500 * 1024, 1.0) + .build_arc()?; + let task_ctx = Arc::new( + TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime), + ); + + // The input has 200 partitions, each partition has a batch containing 100 rows. + // Each row has a single Utf8 column, the Utf8 string values are roughly 42 bytes. + // The total size of the input is roughly 8.4 KB. + let input = test::scan_partitioned_utf8(200); + let schema = input.schema(); + + let sort_exec = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr { + expr: col("i", &schema)?, + options: SortOptions::default(), + }]), + Arc::new(CoalescePartitionsExec::new(input)), + )); + + let result = collect( + Arc::clone(&sort_exec) as Arc, + Arc::clone(&task_ctx), + ) + .await?; + + let num_rows = result.iter().map(|batch| batch.num_rows()).sum::(); + assert_eq!(num_rows, 20000); + + // Now, validate metrics + let metrics = sort_exec.metrics().unwrap(); + + assert_eq!(metrics.output_rows().unwrap(), 20000); + assert!(metrics.elapsed_compute().unwrap() > 0); + + let spill_count = metrics.spill_count().unwrap(); + let spilled_rows = metrics.spilled_rows().unwrap(); + let spilled_bytes = metrics.spilled_bytes().unwrap(); + // Processing 840 KB of data using 400 KB of memory requires at least 2 spills + // It will spill roughly 18000 rows and 800 KBytes. + // We leave a little wiggle room for the actual numbers. + assert!((2..=10).contains(&spill_count)); + assert!((15000..=20000).contains(&spilled_rows)); + assert!((700000..=900000).contains(&spilled_bytes)); + + // Verify that the result is sorted + let concated_result = concat_batches(&schema, &result)?; + let columns = concated_result.columns(); + let string_array = as_string_array(&columns[0]); + for i in 0..string_array.len() - 1 { + assert!(string_array.value(i) <= string_array.value(i + 1)); + } + + assert_eq!( + task_ctx.runtime_env().memory_pool.reserved(), + 0, + "The sort should have returned all memory used back to the memory manager" + ); + + Ok(()) + } + #[tokio::test] async fn test_sort_fetch_memory_calculation() -> Result<()> { // This test mirrors down the size from the example above. diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index ab8054be59a8..e029c60b285b 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -159,6 +159,8 @@ pub struct FieldCursorStream { sort: PhysicalSortExpr, /// Input streams streams: FusedStreams, + /// Create new reservations for each array + reservation: MemoryReservation, phantom: PhantomData T>, } @@ -171,11 +173,16 @@ impl std::fmt::Debug for FieldCursorStream { } impl FieldCursorStream { - pub fn new(sort: PhysicalSortExpr, streams: Vec) -> Self { + pub fn new( + sort: PhysicalSortExpr, + streams: Vec, + reservation: MemoryReservation, + ) -> Self { let streams = streams.into_iter().map(|s| s.fuse()).collect(); Self { sort, streams: FusedStreams(streams), + reservation, phantom: Default::default(), } } @@ -183,8 +190,15 @@ impl FieldCursorStream { fn convert_batch(&mut self, batch: &RecordBatch) -> Result> { let value = self.sort.expr.evaluate(batch)?; let array = value.into_array(batch.num_rows())?; + let size_in_mem = array.get_buffer_memory_size(); let array = array.as_any().downcast_ref::().expect("field values"); - Ok(ArrayValues::new(self.sort.options, array)) + let mut array_reservation = self.reservation.new_empty(); + array_reservation.try_grow(size_in_mem)?; + Ok(ArrayValues::new( + self.sort.options, + array, + array_reservation, + )) } } diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 909b5875c8c5..a541f79dc717 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -38,7 +38,8 @@ macro_rules! primitive_merge_helper { macro_rules! merge_helper { ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident, $enable_round_robin_tie_breaker:ident) => {{ - let streams = FieldCursorStream::<$t>::new($sort, $streams); + let streams = + FieldCursorStream::<$t>::new($sort, $streams, $reservation.new_empty()); return Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), $schema, diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index dbcc46baf8ca..b45353ae13f0 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -62,7 +62,7 @@ pub(crate) fn spill_record_batches( batches: Vec, path: PathBuf, schema: SchemaRef, -) -> Result { +) -> Result<(usize, usize)> { let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; for batch in batches { writer.write(&batch)?; @@ -74,7 +74,7 @@ pub(crate) fn spill_record_batches( writer.num_rows, human_readable_size(writer.num_bytes), ); - Ok(writer.num_rows) + Ok((writer.num_rows, writer.num_bytes)) } fn read_spill(sender: Sender>, path: &Path) -> Result<()> { @@ -213,12 +213,12 @@ mod tests { let spill_file = disk_manager.create_tmp_file("Test Spill")?; let schema = batch1.schema(); let num_rows = batch1.num_rows() + batch2.num_rows(); - let cnt = spill_record_batches( + let (spilled_rows, _) = spill_record_batches( vec![batch1, batch2], spill_file.path().into(), Arc::clone(&schema), - ); - assert_eq!(cnt.unwrap(), num_rows); + )?; + assert_eq!(spilled_rows, num_rows); let file = BufReader::new(File::open(spill_file.path())?); let reader = FileReader::try_new(file, None)?; diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index e73d6d97e986..ad0e43503b2b 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -21,8 +21,8 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use arrow::array::{ArrayRef, Int32Array, RecordBatch}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use futures::{Future, FutureExt}; @@ -132,11 +132,30 @@ pub fn make_partition(sz: i32) -> RecordBatch { RecordBatch::try_new(schema, vec![arr]).unwrap() } +pub fn make_partition_utf8(sz: i32) -> RecordBatch { + let seq_start = 0; + let seq_end = sz; + let values = (seq_start..seq_end) + .map(|i| format!("test_long_string_that_is_roughly_42_bytes_{}", i)) + .collect::>(); + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Utf8, true)])); + let mut string_array = arrow::array::StringArray::from(values); + string_array.shrink_to_fit(); + let arr = Arc::new(string_array); + let arr = arr as ArrayRef; + + RecordBatch::try_new(schema, vec![arr]).unwrap() +} + /// Returns a `DataSourceExec` that scans `partitions` of 100 batches each pub fn scan_partitioned(partitions: usize) -> Arc { Arc::new(mem_exec(partitions)) } +pub fn scan_partitioned_utf8(partitions: usize) -> Arc { + Arc::new(mem_exec_utf8(partitions)) +} + /// Returns a `DataSourceExec` that scans `partitions` of 100 batches each pub fn mem_exec(partitions: usize) -> DataSourceExec { let data: Vec> = (0..partitions).map(|_| vec![make_partition(100)]).collect(); @@ -148,6 +167,18 @@ pub fn mem_exec(partitions: usize) -> DataSourceExec { )) } +pub fn mem_exec_utf8(partitions: usize) -> DataSourceExec { + let data: Vec> = (0..partitions) + .map(|_| vec![make_partition_utf8(100)]) + .collect(); + + let schema = data[0][0].schema(); + let projection = None; + DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&data, schema, projection).unwrap(), + )) +} + // Construct a stream partition for test purposes #[derive(Debug)] pub struct TestPartitionStream { From b652cee78031bf566a93a153a1aa4cade71169c5 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Fri, 14 Feb 2025 17:26:15 +0800 Subject: [PATCH 2/3] Don't reserve additional memory for merging after fetching the first mergd batch --- datafusion/core/tests/memory_limit/mod.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 16 +--------------- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 9a6e4e3a85ee..669294d38af1 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -69,7 +69,7 @@ async fn oom_sort() { .with_expected_errors(vec![ "Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)", ]) - .with_memory_limit(400_000) + .with_memory_limit(500_000) .run() .await } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 09bcfea30d53..00c863036edb 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -435,23 +435,12 @@ impl ExternalSorter { // We'll gradually collect the sorted stream into self.in_mem_batches, or directly // write sorted batches to disk when the memory is insufficient. let mut spill_writer: Option = None; - // Leave at least 1/3 of spill reservation for sort/merge the next batch. Here the - // 1/3 is simply an arbitrary chosen number. - let sort_merge_minimum_overhead = self.sort_spill_reservation_bytes / 3; while let Some(batch) = sorted_stream.next().await { let batch = batch?; match &mut spill_writer { None => { let sorted_size = get_reserved_byte_for_record_batch(&batch); - - // We reserve more memory to ensure that we'll have enough memory for - // `SortPreservingMergeStream` after consuming this batch, otherwise we'll - // start spilling everything to disk. - if self - .reservation - .try_grow(sorted_size + sort_merge_minimum_overhead) - .is_err() - { + if self.reservation.try_grow(sorted_size).is_err() { // Directly write in_mem_batches as well as all the remaining batches in // sorted_stream to disk. Further batches fetched from `sorted_stream` will // be handled by the `Some(writer)` matching arm. @@ -469,9 +458,6 @@ impl ExternalSorter { self.spills.push(spill_file); } else { self.in_mem_batches.push(batch); - - // Gives back memory for merging the next batch. - self.reservation.shrink(sort_merge_minimum_overhead); } } Some(writer) => { From babe5cdf512062c6e8f5b830570ba233bb138026 Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Sun, 16 Feb 2025 20:41:37 +0800 Subject: [PATCH 3/3] Address review comments --- datafusion/core/tests/fuzz_cases/sort_fuzz.rs | 210 ++++++++++++++++-- datafusion/physical-plan/src/sorts/sort.rs | 16 +- 2 files changed, 204 insertions(+), 22 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs index 3806e1cd20a7..51a5bc87efd9 100644 --- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::{ - array::{ArrayRef, Int32Array}, + array::{as_string_array, ArrayRef, Int32Array, StringArray}, compute::SortOptions, record_batch::RecordBatch, }; @@ -29,6 +29,7 @@ use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::cast::as_int32_array; use datafusion_execution::memory_pool::GreedyMemoryPool; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -42,12 +43,17 @@ const KB: usize = 1 << 10; #[cfg_attr(tarpaulin, ignore)] async fn test_sort_10k_mem() { for (batch_size, should_spill) in [(5, false), (20000, true), (500000, true)] { - SortTest::new() + let (input, collected) = SortTest::new() .with_int32_batches(batch_size) + .with_sort_columns(vec!["x"]) .with_pool_size(10 * KB) .with_should_spill(should_spill) .run() .await; + + let expected = partitions_to_sorted_vec(&input); + let actual = batches_to_vec(&collected); + assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}"); } } @@ -57,29 +63,119 @@ async fn test_sort_100k_mem() { for (batch_size, should_spill) in [(5, false), (10000, false), (20000, true), (1000000, true)] { - SortTest::new() + let (input, collected) = SortTest::new() .with_int32_batches(batch_size) + .with_sort_columns(vec!["x"]) + .with_pool_size(100 * KB) + .with_should_spill(should_spill) + .run() + .await; + + let expected = partitions_to_sorted_vec(&input); + let actual = batches_to_vec(&collected); + assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}"); + } +} + +#[tokio::test] +#[cfg_attr(tarpaulin, ignore)] +async fn test_sort_strings_100k_mem() { + for (batch_size, should_spill) in + [(5, false), (1000, false), (10000, true), (20000, true)] + { + let (input, collected) = SortTest::new() + .with_utf8_batches(batch_size) + .with_sort_columns(vec!["x"]) .with_pool_size(100 * KB) .with_should_spill(should_spill) .run() .await; + + let mut input = input + .iter() + .flat_map(|p| p.iter()) + .flat_map(|b| { + let array = b.column(0); + as_string_array(array) + .iter() + .map(|s| s.unwrap().to_string()) + }) + .collect::>(); + input.sort_unstable(); + let actual = collected + .iter() + .flat_map(|b| { + let array = b.column(0); + as_string_array(array) + .iter() + .map(|s| s.unwrap().to_string()) + }) + .collect::>(); + assert_eq!(input, actual); + } +} + +#[tokio::test] +#[cfg_attr(tarpaulin, ignore)] +async fn test_sort_multi_columns_100k_mem() { + for (batch_size, should_spill) in + [(5, false), (1000, false), (10000, true), (20000, true)] + { + let (input, collected) = SortTest::new() + .with_int32_utf8_batches(batch_size) + .with_sort_columns(vec!["x", "y"]) + .with_pool_size(100 * KB) + .with_should_spill(should_spill) + .run() + .await; + + fn record_batch_to_vec(b: &RecordBatch) -> Vec<(i32, String)> { + let mut rows: Vec<_> = Vec::new(); + let i32_array = as_int32_array(b.column(0)).unwrap(); + let string_array = as_string_array(b.column(1)); + for i in 0..b.num_rows() { + let str = string_array.value(i).to_string(); + let i32 = i32_array.value(i); + rows.push((i32, str)); + } + rows + } + let mut input = input + .iter() + .flat_map(|p| p.iter()) + .flat_map(record_batch_to_vec) + .collect::>(); + input.sort_unstable(); + let actual = collected + .iter() + .flat_map(record_batch_to_vec) + .collect::>(); + assert_eq!(input, actual); } } #[tokio::test] async fn test_sort_unlimited_mem() { for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, false)] { - SortTest::new() + let (input, collected) = SortTest::new() .with_int32_batches(batch_size) + .with_sort_columns(vec!["x"]) .with_pool_size(usize::MAX) .with_should_spill(should_spill) .run() .await; + + let expected = partitions_to_sorted_vec(&input); + let actual = batches_to_vec(&collected); + assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}"); } } + #[derive(Debug, Default)] struct SortTest { input: Vec>, + /// The names of the columns to sort by + sort_columns: Vec, /// GreedyMemoryPool size, if specified pool_size: Option, /// If true, expect the sort to spill @@ -91,12 +187,29 @@ impl SortTest { Default::default() } + fn with_sort_columns(mut self, sort_columns: Vec<&str>) -> Self { + self.sort_columns = sort_columns.iter().map(|s| s.to_string()).collect(); + self + } + /// Create batches of int32 values of rows fn with_int32_batches(mut self, rows: usize) -> Self { self.input = vec![make_staggered_i32_batches(rows)]; self } + /// Create batches of utf8 values of rows + fn with_utf8_batches(mut self, rows: usize) -> Self { + self.input = vec![make_staggered_utf8_batches(rows)]; + self + } + + /// Create batches of int32 and utf8 values of rows + fn with_int32_utf8_batches(mut self, rows: usize) -> Self { + self.input = vec![make_staggered_i32_utf8_batches(rows)]; + self + } + /// specify that this test should use a memory pool of the specified size fn with_pool_size(mut self, pool_size: usize) -> Self { self.pool_size = Some(pool_size); @@ -110,7 +223,7 @@ impl SortTest { /// Sort the input using SortExec and ensure the results are /// correct according to `Vec::sort` both with and without spilling - async fn run(&self) { + async fn run(&self) -> (Vec>, Vec) { let input = self.input.clone(); let first_batch = input .iter() @@ -119,16 +232,21 @@ impl SortTest { .expect("at least one batch"); let schema = first_batch.schema(); - let sort = LexOrdering::new(vec![PhysicalSortExpr { - expr: col("x", &schema).unwrap(), - options: SortOptions { - descending: false, - nulls_first: true, - }, - }]); + let sort_ordering = LexOrdering::new( + self.sort_columns + .iter() + .map(|c| PhysicalSortExpr { + expr: col(c, &schema).unwrap(), + options: SortOptions { + descending: false, + nulls_first: true, + }, + }) + .collect(), + ); let exec = MemorySourceConfig::try_new_exec(&input, schema, None).unwrap(); - let sort = Arc::new(SortExec::new(sort, exec)); + let sort = Arc::new(SortExec::new(sort_ordering, exec)); let session_config = SessionConfig::new(); let session_ctx = if let Some(pool_size) = self.pool_size { @@ -153,9 +271,6 @@ impl SortTest { let task_ctx = session_ctx.task_ctx(); let collected = collect(sort.clone(), task_ctx).await.unwrap(); - let expected = partitions_to_sorted_vec(&input); - let actual = batches_to_vec(&collected); - if self.should_spill { assert_ne!( sort.metrics().unwrap().spill_count().unwrap(), @@ -175,7 +290,8 @@ impl SortTest { 0, "The sort should have returned all memory used back to the memory pool" ); - assert_eq!(expected, actual, "failure in @ pool_size {self:?}"); + + (input, collected) } } @@ -203,3 +319,63 @@ fn make_staggered_i32_batches(len: usize) -> Vec { } batches } + +/// Return randomly sized record batches in a field named 'x' of type `Utf8` +/// with randomized content +fn make_staggered_utf8_batches(len: usize) -> Vec { + let mut rng = rand::thread_rng(); + let max_batch = 1024; + + let mut batches = vec![]; + let mut remaining = len; + while remaining != 0 { + let to_read = rng.gen_range(0..=remaining.min(max_batch)); + remaining -= to_read; + + batches.push( + RecordBatch::try_from_iter(vec![( + "x", + Arc::new(StringArray::from_iter_values( + (0..to_read).map(|_| format!("test_string_{}", rng.gen::())), + )) as ArrayRef, + )]) + .unwrap(), + ) + } + batches +} + +/// Return randomly sized record batches in a field named 'x' of type `Int32` +/// with randomized i32 content and a field named 'y' of type `Utf8` +/// with randomized content +fn make_staggered_i32_utf8_batches(len: usize) -> Vec { + let mut rng = rand::thread_rng(); + let max_batch = 1024; + + let mut batches = vec![]; + let mut remaining = len; + while remaining != 0 { + let to_read = rng.gen_range(0..=remaining.min(max_batch)); + remaining -= to_read; + + batches.push( + RecordBatch::try_from_iter(vec![ + ( + "x", + Arc::new(Int32Array::from_iter_values( + (0..to_read).map(|_| rng.gen()), + )) as ArrayRef, + ), + ( + "y", + Arc::new(StringArray::from_iter_values( + (0..to_read).map(|_| format!("test_string_{}", rng.gen::())), + )) as ArrayRef, + ), + ]) + .unwrap(), + ) + } + + batches +} diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 00c863036edb..649468260e56 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -225,6 +225,8 @@ struct ExternalSorter { // ======================================================================== /// Potentially unsorted in memory buffer in_mem_batches: Vec, + /// if `Self::in_mem_batches` are sorted + in_mem_batches_sorted: bool, /// If data has previously been spilled, the locations of the /// spill files (in Arrow IPC format) @@ -277,6 +279,7 @@ impl ExternalSorter { Self { schema, in_mem_batches: vec![], + in_mem_batches_sorted: false, spills: vec![], expr: expr.into(), metrics, @@ -309,6 +312,7 @@ impl ExternalSorter { } self.in_mem_batches.push(input); + self.in_mem_batches_sorted = false; Ok(()) } @@ -423,7 +427,8 @@ impl ExternalSorter { async fn sort_or_spill_in_mem_batches(&mut self) -> Result<()> { // Release the memory reserved for merge back to the pool so // there is some left when `in_mem_sort_stream` requests an - // allocation. + // allocation. At the end of this function, memory will be + // reserved again for the next spill. self.merge_reservation.free(); let before = self.reservation.size(); @@ -458,6 +463,7 @@ impl ExternalSorter { self.spills.push(spill_file); } else { self.in_mem_batches.push(batch); + self.in_mem_batches_sorted = true; } } Some(writer) => { @@ -662,10 +668,10 @@ impl ExternalSorter { /// Estimate how much memory is needed to sort a `RecordBatch`. /// /// This is used to pre-reserve memory for the sort/merge. The sort/merge process involves -/// creating sorted copies of sorted columns in record batches, the sorted copies could be -/// in either row format or array format. Please refer to cursor.rs and stream.rs for more -/// details. No matter what format the sorted copies are, they will use more memory than -/// the original record batch. +/// creating sorted copies of sorted columns in record batches for speeding up comparison +/// in sorting and merging. The sorted copies are in either row format or array format. +/// Please refer to cursor.rs and stream.rs for more details. No matter what format the +/// sorted copies are, they will use more memory than the original record batch. fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize { // 2x may not be enough for some cases, but it's a good start. // If 2x is not enough, user can set a larger value for `sort_spill_reservation_bytes`