diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 281da1f69e69..86e6221c3bc5 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -254,6 +254,24 @@ config_namespace! { /// Number of files to read in parallel when inferring schema and statistics pub meta_fetch_concurrency: usize, default = 32 + + /// Target number of rows in output files when writing multiple. + /// This is a soft max, so it can be exceeded slightly. There also + /// will be one file smaller than the limit if the total + /// number of rows written is not roughly divisible by the soft max + pub soft_max_rows_per_output_file: usize, default = 50000000 + + /// This is the maximum number of output files being written + /// in parallel. Higher values can potentially give faster write + /// performance at the cost of higher peak memory consumption. + pub max_parallel_ouput_files: usize, default = 8 + + /// This is the maximum number of RecordBatches buffered + /// for each output file being worked. Higher values can potentially + /// give faster write performance at the cost of higher peak + /// memory consumption + pub max_buffered_batches_per_output_file: usize, default = 2 + } } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 41265ede7fc7..bc01b29ba04b 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -23,11 +23,10 @@ use std::fmt; use std::fmt::Debug; use std::sync::Arc; +use super::write::{stateless_append_all, stateless_multipart_put}; use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::file_format::write::{ - create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode, -}; +use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode}; use crate::datasource::physical_plan::{ CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig, }; @@ -51,7 +50,6 @@ use bytes::{Buf, Bytes}; use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; -use rand::distributions::{Alphanumeric, DistString}; /// Character Separated Value `FileFormat` implementation. #[derive(Debug)] @@ -481,6 +479,82 @@ impl CsvSink { fn new(config: FileSinkConfig) -> Self { Self { config } } + + async fn append_all( + &self, + data: SendableRecordBatchStream, + context: &Arc, + ) -> Result { + let writer_options = self.config.file_type_writer_options.try_into_csv()?; + let (builder, compression) = + (&writer_options.writer_options, &writer_options.compression); + let compression = FileCompressionType::from(*compression); + + let object_store = context + .runtime_env() + .object_store(&self.config.object_store_url)?; + let file_groups = &self.config.file_groups; + + let builder_clone = builder.clone(); + let options_clone = writer_options.clone(); + let get_serializer = move |file_size| { + let inner_clone = builder_clone.clone(); + // In append mode, consider has_header flag only when file is empty (at the start). + // For other modes, use has_header flag as is. + let serializer: Box = Box::new(if file_size > 0 { + CsvSerializer::new() + .with_builder(inner_clone) + .with_header(false) + } else { + CsvSerializer::new() + .with_builder(inner_clone) + .with_header(options_clone.has_header) + }); + serializer + }; + + stateless_append_all( + data, + context, + object_store, + file_groups, + self.config.unbounded_input, + compression, + Box::new(get_serializer), + ) + .await + } + + async fn multipartput_all( + &self, + data: SendableRecordBatchStream, + context: &Arc, + ) -> Result { + let writer_options = self.config.file_type_writer_options.try_into_csv()?; + let builder = &writer_options.writer_options; + + let builder_clone = builder.clone(); + let options_clone = writer_options.clone(); + let get_serializer = move || { + let inner_clone = builder_clone.clone(); + let serializer: Box = Box::new( + CsvSerializer::new() + .with_builder(inner_clone) + .with_header(options_clone.has_header), + ); + serializer + }; + + stateless_multipart_put( + data, + context, + "csv".into(), + Box::new(get_serializer), + &self.config, + writer_options.compression.into(), + ) + .await + } } #[async_trait] @@ -495,116 +569,22 @@ impl DataSink for CsvSink { async fn write_all( &self, - data: Vec, + data: SendableRecordBatchStream, context: &Arc, ) -> Result { - let num_partitions = data.len(); - let writer_options = self.config.file_type_writer_options.try_into_csv()?; - let (builder, compression) = - (&writer_options.writer_options, &writer_options.compression); - let mut has_header = writer_options.has_header; - let compression = FileCompressionType::from(*compression); - - let object_store = context - .runtime_env() - .object_store(&self.config.object_store_url)?; - // Construct serializer and writer for each file group - let mut serializers: Vec> = vec![]; - let mut writers = vec![]; match self.config.writer_mode { FileWriterMode::Append => { - for file_group in &self.config.file_groups { - let mut append_builder = builder.clone(); - // In append mode, consider has_header flag only when file is empty (at the start). - // For other modes, use has_header flag as is. - if file_group.object_meta.size != 0 { - has_header = false; - append_builder = append_builder.has_headers(false); - } - let serializer = CsvSerializer::new() - .with_builder(append_builder) - .with_header(has_header); - serializers.push(Box::new(serializer)); - - let file = file_group.clone(); - let writer = create_writer( - self.config.writer_mode, - compression, - file.object_meta.clone().into(), - object_store.clone(), - ) - .await?; - writers.push(writer); - } - } - FileWriterMode::Put => { - return not_impl_err!("Put Mode is not implemented for CSV Sink yet") + let total_count = self.append_all(data, context).await?; + Ok(total_count) } FileWriterMode::PutMultipart => { - // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) - let base_path = &self.config.table_paths[0]; - match self.config.single_file_output { - false => { - // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files - let write_id = - Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - for part_idx in 0..num_partitions { - let serializer = CsvSerializer::new() - .with_builder(builder.clone()) - .with_header(has_header); - serializers.push(Box::new(serializer)); - let file_path = base_path - .prefix() - .child(format!("{}_{}.csv", write_id, part_idx)); - let object_meta = ObjectMeta { - location: file_path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = create_writer( - self.config.writer_mode, - compression, - object_meta.into(), - object_store.clone(), - ) - .await?; - writers.push(writer); - } - } - true => { - let serializer = CsvSerializer::new() - .with_builder(builder.clone()) - .with_header(has_header); - serializers.push(Box::new(serializer)); - let file_path = base_path.prefix(); - let object_meta = ObjectMeta { - location: file_path.clone(), - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = create_writer( - self.config.writer_mode, - compression, - object_meta.into(), - object_store.clone(), - ) - .await?; - writers.push(writer); - } - } + let total_count = self.multipartput_all(data, context).await?; + Ok(total_count) + } + FileWriterMode::Put => { + return not_impl_err!("FileWriterMode::Put is not supported yet!") } } - - stateless_serialize_and_write_files( - data, - serializers, - writers, - self.config.single_file_output, - self.config.unbounded_input, - ) - .await } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index dc5b24b2ea10..e1f8ab0d57b7 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -23,11 +23,10 @@ use std::fmt::Debug; use std::io::BufReader; use std::sync::Arc; +use super::write::{stateless_append_all, stateless_multipart_put}; use super::{FileFormat, FileScanConfig}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::file_format::write::{ - create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode, -}; +use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode}; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig, NdJsonExec}; use crate::error::Result; @@ -49,7 +48,6 @@ use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; use bytes::{Buf, Bytes}; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; -use rand::distributions::{Alphanumeric, DistString}; /// New line delimited JSON `FileFormat` implementation. #[derive(Debug)] @@ -172,7 +170,7 @@ impl FileFormat for JsonFormat { return not_impl_err!("Inserting compressed JSON is not implemented yet."); } let sink_schema = conf.output_schema().clone(); - let sink = Arc::new(JsonSink::new(conf, self.file_compression_type)); + let sink = Arc::new(JsonSink::new(conf)); Ok(Arc::new(FileSinkExec::new( input, @@ -226,14 +224,11 @@ impl BatchSerializer for JsonSerializer { struct JsonSink { /// Config options for writing data config: FileSinkConfig, - file_compression_type: FileCompressionType, } impl Debug for JsonSink { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("JsonSink") - .field("file_compression_type", &self.file_compression_type) - .finish() + f.debug_struct("JsonSink").finish() } } @@ -254,11 +249,62 @@ impl DisplayAs for JsonSink { } impl JsonSink { - fn new(config: FileSinkConfig, file_compression_type: FileCompressionType) -> Self { - Self { - config, - file_compression_type, - } + fn new(config: FileSinkConfig) -> Self { + Self { config } + } + + async fn append_all( + &self, + data: SendableRecordBatchStream, + context: &Arc, + ) -> Result { + let writer_options = self.config.file_type_writer_options.try_into_json()?; + let compression = &writer_options.compression; + + let object_store = context + .runtime_env() + .object_store(&self.config.object_store_url)?; + let file_groups = &self.config.file_groups; + + let get_serializer = move |_| { + let serializer: Box = Box::new(JsonSerializer::new()); + serializer + }; + + stateless_append_all( + data, + context, + object_store, + file_groups, + self.config.unbounded_input, + (*compression).into(), + Box::new(get_serializer), + ) + .await + } + + async fn multipartput_all( + &self, + data: SendableRecordBatchStream, + context: &Arc, + ) -> Result { + let writer_options = self.config.file_type_writer_options.try_into_json()?; + let compression = &writer_options.compression; + + let get_serializer = move || { + let serializer: Box = Box::new(JsonSerializer::new()); + serializer + }; + + stateless_multipart_put( + data, + context, + "json".into(), + Box::new(get_serializer), + &self.config, + (*compression).into(), + ) + .await } } @@ -274,106 +320,22 @@ impl DataSink for JsonSink { async fn write_all( &self, - data: Vec, + data: SendableRecordBatchStream, context: &Arc, ) -> Result { - let num_partitions = data.len(); - - let object_store = context - .runtime_env() - .object_store(&self.config.object_store_url)?; - - let writer_options = self.config.file_type_writer_options.try_into_json()?; - - let compression = FileCompressionType::from(writer_options.compression); - - // Construct serializer and writer for each file group - let mut serializers: Vec> = vec![]; - let mut writers = vec![]; match self.config.writer_mode { FileWriterMode::Append => { - if self.config.single_file_output { - return Err(DataFusionError::NotImplemented("single_file_output=true is not implemented for JsonSink in Append mode".into())); - } - for file_group in &self.config.file_groups { - let serializer = JsonSerializer::new(); - serializers.push(Box::new(serializer)); - - let file = file_group.clone(); - let writer = create_writer( - self.config.writer_mode, - compression, - file.object_meta.clone().into(), - object_store.clone(), - ) - .await?; - writers.push(writer); - } - } - FileWriterMode::Put => { - return not_impl_err!("Put Mode is not implemented for Json Sink yet") + let total_count = self.append_all(data, context).await?; + Ok(total_count) } FileWriterMode::PutMultipart => { - // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) - let base_path = &self.config.table_paths[0]; - match self.config.single_file_output { - false => { - // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files - let write_id = - Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - for part_idx in 0..num_partitions { - let serializer = JsonSerializer::new(); - serializers.push(Box::new(serializer)); - let file_path = base_path - .prefix() - .child(format!("{}_{}.json", write_id, part_idx)); - let object_meta = ObjectMeta { - location: file_path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = create_writer( - self.config.writer_mode, - compression, - object_meta.into(), - object_store.clone(), - ) - .await?; - writers.push(writer); - } - } - true => { - let serializer = JsonSerializer::new(); - serializers.push(Box::new(serializer)); - let file_path = base_path.prefix(); - let object_meta = ObjectMeta { - location: file_path.clone(), - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = create_writer( - self.config.writer_mode, - compression, - object_meta.into(), - object_store.clone(), - ) - .await?; - writers.push(writer); - } - } + let total_count = self.multipartput_all(data, context).await?; + Ok(total_count) + } + FileWriterMode::Put => { + return not_impl_err!("FileWriterMode::Put is not supported yet!") } } - - stateless_serialize_and_write_files( - data, - serializers, - writers, - self.config.single_file_output, - self.config.unbounded_input, - ) - .await } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 859bff7ae46f..12d5d515bbcd 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -23,7 +23,7 @@ use std::fmt::Debug; use std::io::Write; use std::sync::Arc; -use super::write::{create_writer, AbortableWrite, FileWriterMode}; +use super::write::{create_writer, start_demuxer_task, AbortableWrite, FileWriterMode}; use super::{FileFormat, FileScanConfig}; use crate::config::ConfigOptions; @@ -62,7 +62,6 @@ use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::file::writer::SerializedFileWriter; -use rand::distributions::{Alphanumeric, DistString}; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::task::{JoinHandle, JoinSet}; @@ -641,79 +640,6 @@ impl ParquetSink { } } - /// Creates an AsyncArrowWriter for each partition to be written out - /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized - async fn create_all_async_arrow_writers( - &self, - num_partitions: usize, - parquet_props: &WriterProperties, - object_store: Arc, - ) -> Result< - Vec>>, - > { - // Construct writer for each file group - let mut writers = vec![]; - match self.config.writer_mode { - FileWriterMode::Append => { - return plan_err!( - "Parquet format does not support appending to existing file!" - ) - } - FileWriterMode::Put => { - return not_impl_err!("Put Mode is not implemented for ParquetSink yet") - } - FileWriterMode::PutMultipart => { - // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) - let base_path = &self.config.table_paths[0]; - match self.config.single_file_output { - false => { - // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files - let write_id = - Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - for part_idx in 0..num_partitions { - let file_path = base_path - .prefix() - .child(format!("{}_{}.parquet", write_id, part_idx)); - let object_meta = ObjectMeta { - location: file_path, - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = self - .create_async_arrow_writer( - object_meta.into(), - object_store.clone(), - parquet_props.clone(), - ) - .await?; - writers.push(writer); - } - } - true => { - let file_path = base_path.prefix(); - let object_meta = ObjectMeta { - location: file_path.clone(), - last_modified: chrono::offset::Utc::now(), - size: 0, - e_tag: None, - }; - let writer = self - .create_async_arrow_writer( - object_meta.into(), - object_store.clone(), - parquet_props.clone(), - ) - .await?; - writers.push(writer); - } - } - } - } - - Ok(writers) - } - /// Creates an object store writer for each output partition /// This is used when parallelizing individual parquet file writes. async fn create_object_store_writers( @@ -758,10 +684,9 @@ impl DataSink for ParquetSink { async fn write_all( &self, - mut data: Vec, + data: SendableRecordBatchStream, context: &Arc, ) -> Result { - let num_partitions = data.len(); let parquet_props = self .config .file_type_writer_options @@ -772,63 +697,93 @@ impl DataSink for ParquetSink { .runtime_env() .object_store(&self.config.object_store_url)?; - let mut row_count = 0; + let exec_options = &context.session_config().options().execution; + + let allow_single_file_parallelism = + exec_options.parquet.allow_single_file_parallelism; + + // This is a temporary special case until https://github.com/apache/arrow-datafusion/pull/7655 + // can be pulled in. + if allow_single_file_parallelism && self.config.single_file_output { + let object_store_writer = self + .create_object_store_writers(1, object_store) + .await? + .remove(0); + + let schema_clone = self.config.output_schema.clone(); + return output_single_parquet_file_parallelized( + object_store_writer, + vec![data], + schema_clone, + parquet_props, + ) + .await + .map(|r| r as u64); + } - let allow_single_file_parallelism = context - .session_config() - .options() - .execution - .parquet - .allow_single_file_parallelism; - - match self.config.single_file_output { - false => { - let writers = self - .create_all_async_arrow_writers( - num_partitions, - parquet_props, - object_store.clone(), - ) - .await?; - // TODO parallelize individual parquet serialization when already outputting multiple parquet files - // e.g. if outputting 2 parquet files on a system with 32 threads, spawn 16 tasks for each individual - // file to be serialized. - row_count = output_multiple_parquet_files(writers, data).await?; - } - true => { - if !allow_single_file_parallelism || data.len() <= 1 { - let mut writer = self - .create_all_async_arrow_writers( - num_partitions, - parquet_props, - object_store.clone(), - ) - .await? - .remove(0); - for data_stream in data.iter_mut() { - while let Some(batch) = data_stream.next().await.transpose()? { - row_count += batch.num_rows(); - writer.write(&batch).await?; - } + let (demux_task, mut file_stream_rx) = start_demuxer_task( + data, + context, + None, + self.config.table_paths[0].clone(), + "parquet".into(), + self.config.single_file_output, + ); + + let mut file_write_tasks: JoinSet> = + JoinSet::new(); + while let Some((path, mut rx)) = file_stream_rx.recv().await { + let mut writer = self + .create_async_arrow_writer( + ObjectMeta { + location: path, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, } + .into(), + object_store.clone(), + parquet_props.clone(), + ) + .await?; - writer.close().await?; - } else { - let object_store_writer = self - .create_object_store_writers(1, object_store) - .await? - .remove(0); - row_count = output_single_parquet_file_parallelized( - object_store_writer, - data, - self.config.output_schema.clone(), - parquet_props, - ) - .await?; + file_write_tasks.spawn(async move { + let mut row_count = 0; + while let Some(batch) = rx.recv().await { + row_count += batch.num_rows(); + writer.write(&batch).await?; + } + writer.close().await?; + Ok(row_count) + }); + } + + let mut row_count = 0; + while let Some(result) = file_write_tasks.join_next().await { + match result { + Ok(r) => { + row_count += r?; + } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } } } } + match demux_task.await { + Ok(r) => r?, + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } Ok(row_count as u64) } } @@ -974,48 +929,6 @@ async fn output_single_parquet_file_parallelized( }; object_store_writer.write_all(final_buff.as_slice()).await?; object_store_writer.shutdown().await?; - println!("done!"); - - Ok(row_count) -} - -/// Serializes multiple parquet files independently in parallel from different RecordBatch streams. -/// AsyncArrowWriter is used to coordinate serialization and MultiPart puts to ObjectStore -/// Only a single CPU thread is used to serialize each individual parquet file, so write speed and overall -/// CPU utilization is dependent on the number of output files. -async fn output_multiple_parquet_files( - writers: Vec< - AsyncArrowWriter>, - >, - data: Vec, -) -> Result { - let mut row_count = 0; - let mut join_set: JoinSet> = JoinSet::new(); - for (mut data_stream, mut writer) in data.into_iter().zip(writers.into_iter()) { - join_set.spawn(async move { - let mut cnt = 0; - while let Some(batch) = data_stream.next().await.transpose()? { - cnt += batch.num_rows(); - writer.write(&batch).await?; - } - writer.close().await?; - Ok(cnt) - }); - } - while let Some(result) = join_set.join_next().await { - match result { - Ok(res) => { - row_count += res?; - } // propagate DataFusion error - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } - } Ok(row_count) } diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 42d18eef634c..928d0d1ba595 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -25,24 +25,27 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::datasource::file_format::file_compression_type::FileCompressionType; -use crate::datasource::physical_plan::FileMeta; +use crate::datasource::listing::{ListingTableUrl, PartitionedFile}; +use crate::datasource::physical_plan::{FileMeta, FileSinkConfig}; use crate::error::Result; use crate::physical_plan::SendableRecordBatchStream; use arrow_array::RecordBatch; -use datafusion_common::{exec_err, internal_err, DataFusionError}; +use datafusion_common::{exec_err, DataFusionError}; use async_trait::async_trait; use bytes::Bytes; -use datafusion_execution::RecordBatchStream; +use datafusion_execution::TaskContext; use futures::future::BoxFuture; use futures::FutureExt; use futures::{ready, StreamExt}; use object_store::path::Path; use object_store::{MultipartId, ObjectMeta, ObjectStore}; +use rand::distributions::DistString; use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::{JoinHandle, JoinSet}; +use tokio::try_join; /// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. /// It is specifically designed for the `object_store` crate's `put` method and sends @@ -308,22 +311,20 @@ type SerializerType = Box; /// concurrently. Data order is preserved. In the event of an error, /// the ObjectStore writer is returned to the caller in addition to an error, /// so that the caller may handle aborting failed writes. -async fn serialize_rb_stream_to_object_store( - mut data_stream: Pin>, +pub(crate) async fn serialize_rb_stream_to_object_store( + mut data_rx: Receiver, mut serializer: Box, mut writer: AbortableWrite>, unbounded_input: bool, -) -> std::result::Result<(SerializerType, WriterType, u64), (WriterType, DataFusionError)> -{ +) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> { let (tx, mut rx) = mpsc::channel::>>(100); let serialize_task = tokio::spawn(async move { - while let Some(maybe_batch) = data_stream.next().await { + while let Some(batch) = data_rx.recv().await { match serializer.duplicate() { Ok(mut serializer_clone) => { let handle = tokio::spawn(async move { - let batch = maybe_batch?; let num_rows = batch.num_rows(); let bytes = serializer_clone.serialize(batch).await?; Ok((num_rows, bytes)) @@ -344,7 +345,7 @@ async fn serialize_rb_stream_to_object_store( } } } - Ok(serializer) + Ok(()) }); let mut row_count = 0; @@ -380,8 +381,8 @@ async fn serialize_rb_stream_to_object_store( } } - let serializer = match serialize_task.await { - Ok(Ok(serializer)) => serializer, + match serialize_task.await { + Ok(Ok(_)) => (), Ok(Err(e)) => return Err((writer, e)), Err(_) => { return Err(( @@ -390,29 +391,166 @@ async fn serialize_rb_stream_to_object_store( )) } }; - Ok((serializer, writer, row_count as u64)) + Ok((writer, row_count as u64)) } +type RecordBatchReceiver = Receiver; +type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>; + +/// Splits a single [SendableRecordBatchStream] into a dynamically determined +/// number of partitions at execution time. The partitions are determined by +/// factors known only at execution time, such as total number of rows and +/// partition column values. The demuxer task communicates to the caller +/// by sending channels over a channel. The inner channels send RecordBatches +/// which should be contained within the same output file. The outer channel +/// is used to send a dynamic number of inner channels, representing a dynamic +/// number of total output files. The caller is also responsible to monitor +/// the demux task for errors and abort accordingly. The single_file_ouput parameter +/// overrides all other settings to force only a single file to be written. +/// partition_by parameter will additionally split the input based on the unique +/// values of a specific column ``` +/// ┌───────────┐ ┌────────────┐ ┌─────────────┐ +/// ┌──────▶ │ batch 1 ├────▶...──────▶│ Batch a │ │ Output File1│ +/// │ └───────────┘ └────────────┘ └─────────────┘ +/// │ +/// ┌──────────┐ │ ┌───────────┐ ┌────────────┐ ┌─────────────┐ +/// ┌───────────┐ ┌────────────┐ │ │ ├──────▶ │ batch a+1├────▶...──────▶│ Batch b │ │ Output File2│ +/// │ batch 1 ├────▶...──────▶│ Batch N ├─────▶│ Demux ├────────┤ ... └───────────┘ └────────────┘ └─────────────┘ +/// └───────────┘ └────────────┘ │ │ │ +/// └──────────┘ │ ┌───────────┐ ┌────────────┐ ┌─────────────┐ +/// └──────▶ │ batch d ├────▶...──────▶│ Batch n │ │ Output FileN│ +/// └───────────┘ └────────────┘ └─────────────┘ +pub(crate) fn start_demuxer_task( + input: SendableRecordBatchStream, + context: &Arc, + _partition_by: Option<&str>, + base_output_path: ListingTableUrl, + file_extension: String, + single_file_output: bool, +) -> (JoinHandle>, DemuxedStreamReceiver) { + let exec_options = &context.session_config().options().execution; + + let max_rows_per_file = exec_options.soft_max_rows_per_output_file; + let max_parallel_files = exec_options.max_parallel_ouput_files; + let max_buffered_batches = exec_options.max_buffered_batches_per_output_file; + + let (tx, rx) = mpsc::channel(max_parallel_files); + + let task = tokio::spawn(async move { + row_count_demuxer( + input, + base_output_path, + file_extension, + single_file_output, + max_rows_per_file, + max_buffered_batches, + tx, + ) + .await + }); + (task, rx) +} + +fn generate_file_path( + base_output_path: &ListingTableUrl, + write_id: &str, + part_idx: usize, + file_extension: &str, + single_file_output: bool, +) -> Path { + if !single_file_output { + base_output_path + .prefix() + .child(format!("{}_{}.{}", write_id, part_idx, file_extension)) + } else { + base_output_path.prefix().to_owned() + } +} + +async fn create_new_file_stream( + base_output_path: &ListingTableUrl, + write_id: &str, + part_idx: usize, + file_extension: &str, + single_file_output: bool, + max_buffered_batches: usize, + tx: &mut Sender<(Path, Receiver)>, +) -> Result> { + let file_path = generate_file_path( + base_output_path, + write_id, + part_idx, + file_extension, + single_file_output, + ); + let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2); + tx.send((file_path, rx_file)).await.map_err(|_| { + DataFusionError::Execution("Error sending RecordBatch to file stream!".into()) + })?; + Ok(tx_file) +} + +async fn row_count_demuxer( + mut input: SendableRecordBatchStream, + base_output_path: ListingTableUrl, + file_extension: String, + single_file_output: bool, + max_rows_per_file: usize, + max_buffered_batches: usize, + mut tx: Sender<(Path, Receiver)>, +) -> Result<()> { + let mut total_rows_current_file = 0; + let mut part_idx = 0; + let write_id = + rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + + let mut tx_file = create_new_file_stream( + &base_output_path, + &write_id, + part_idx, + &file_extension, + single_file_output, + max_buffered_batches, + &mut tx, + ) + .await?; + part_idx += 1; + + while let Some(rb) = input.next().await.transpose()? { + total_rows_current_file += rb.num_rows(); + tx_file.send(rb).await.map_err(|_| { + DataFusionError::Execution("Error sending RecordBatch to file stream!".into()) + })?; + + if total_rows_current_file >= max_rows_per_file && !single_file_output { + total_rows_current_file = 0; + tx_file = create_new_file_stream( + &base_output_path, + &write_id, + part_idx, + &file_extension, + single_file_output, + max_buffered_batches, + &mut tx, + ) + .await?; + part_idx += 1; + } + } + Ok(()) +} + +type FileWriteBundle = (Receiver, SerializerType, WriterType); /// Contains the common logic for serializing RecordBatches and /// writing the resulting bytes to an ObjectStore. /// Serialization is assumed to be stateless, i.e. /// each RecordBatch can be serialized without any /// dependency on the RecordBatches before or after. pub(crate) async fn stateless_serialize_and_write_files( - data: Vec, - mut serializers: Vec, - mut writers: Vec, - single_file_output: bool, + mut rx: Receiver, + tx: tokio::sync::oneshot::Sender, unbounded_input: bool, -) -> Result { - if single_file_output && (serializers.len() != 1 || writers.len() != 1) { - return internal_err!("single_file_output is true, but got more than 1 writer!"); - } - let num_partitions = data.len(); - let num_writers = writers.len(); - if !single_file_output && (num_partitions != num_writers) { - return internal_err!("single_file_ouput is false, but did not get 1 writer for each output partition!"); - } +) -> Result<()> { let mut row_count = 0; // tracks if any writers encountered an error triggering the need to abort let mut any_errors = false; @@ -421,100 +559,58 @@ pub(crate) async fn stateless_serialize_and_write_files( // tracks if any errors were encountered in the process of aborting writers. // if true, we may not have a guarentee that all written data was cleaned up. let mut any_abort_errors = false; - match single_file_output { - false => { - let mut join_set = JoinSet::new(); - for (data_stream, serializer, writer) in data - .into_iter() - .zip(serializers.into_iter()) - .zip(writers.into_iter()) - .map(|((a, b), c)| (a, b, c)) - { - join_set.spawn(async move { - serialize_rb_stream_to_object_store( - data_stream, - serializer, - writer, - unbounded_input, - ) - .await - }); - } - let mut finished_writers = Vec::with_capacity(num_writers); - while let Some(result) = join_set.join_next().await { - match result { - Ok(res) => match res { - Ok((_, writer, cnt)) => { - finished_writers.push(writer); - row_count += cnt; - } - Err((writer, e)) => { - finished_writers.push(writer); - any_errors = true; - triggering_error = Some(e); - } - }, - Err(e) => { - // Don't panic, instead try to clean up as many writers as possible. - // If we hit this code, ownership of a writer was not joined back to - // this thread, so we cannot clean it up (hence any_abort_errors is true) - any_errors = true; - any_abort_errors = true; - triggering_error = Some(DataFusionError::Internal(format!( - "Unexpected join error while serializing file {e}" - ))); - } + let mut join_set = JoinSet::new(); + while let Some((data_rx, serializer, writer)) = rx.recv().await { + join_set.spawn(async move { + serialize_rb_stream_to_object_store( + data_rx, + serializer, + writer, + unbounded_input, + ) + .await + }); + } + let mut finished_writers = Vec::new(); + while let Some(result) = join_set.join_next().await { + match result { + Ok(res) => match res { + Ok((writer, cnt)) => { + finished_writers.push(writer); + row_count += cnt; } - } - - // Finalize or abort writers as appropriate - for mut writer in finished_writers.into_iter() { - match any_errors { - true => { - let abort_result = writer.abort_writer(); - if abort_result.is_err() { - any_abort_errors = true; - } - } - false => { - writer.shutdown() - .await - .map_err(|_| DataFusionError::Internal("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!".into()))?; - } + Err((writer, e)) => { + finished_writers.push(writer); + any_errors = true; + triggering_error = Some(e); } + }, + Err(e) => { + // Don't panic, instead try to clean up as many writers as possible. + // If we hit this code, ownership of a writer was not joined back to + // this thread, so we cannot clean it up (hence any_abort_errors is true) + any_errors = true; + any_abort_errors = true; + triggering_error = Some(DataFusionError::Internal(format!( + "Unexpected join error while serializing file {e}" + ))); } } - true => { - let mut writer = writers.remove(0); - let mut serializer = serializers.remove(0); - let mut cnt; - for data_stream in data.into_iter() { - (serializer, writer, cnt) = match serialize_rb_stream_to_object_store( - data_stream, - serializer, - writer, - unbounded_input, - ) - .await - { - Ok((s, w, c)) => (s, w, c), - Err((w, e)) => { - any_errors = true; - triggering_error = Some(e); - writer = w; - break; - } - }; - row_count += cnt; - } - match any_errors { - true => { - let abort_result = writer.abort_writer(); - if abort_result.is_err() { - any_abort_errors = true; - } + } + + // Finalize or abort writers as appropriate + for mut writer in finished_writers.into_iter() { + match any_errors { + true => { + let abort_result = writer.abort_writer(); + if abort_result.is_err() { + any_abort_errors = true; } - false => writer.shutdown().await?, + } + false => { + writer.shutdown() + .await + .map_err(|_| DataFusionError::Internal("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!".into()))?; } } } @@ -529,5 +625,190 @@ pub(crate) async fn stateless_serialize_and_write_files( } } - Ok(row_count) + tx.send(row_count).map_err(|_| { + DataFusionError::Internal( + "Error encountered while sending row count back to file sink!".into(), + ) + })?; + Ok(()) +} + +/// Orchestrates multipart put of a dynamic number of output files from a single input stream +/// for any statelessly serialized file type. That is, any file type for which each [RecordBatch] +/// can be serialized independently of all other [RecordBatch]s. +pub(crate) async fn stateless_multipart_put( + data: SendableRecordBatchStream, + context: &Arc, + file_extension: String, + get_serializer: Box Box + Send>, + config: &FileSinkConfig, + compression: FileCompressionType, +) -> Result { + let object_store = context + .runtime_env() + .object_store(&config.object_store_url)?; + + let single_file_output = config.single_file_output; + let base_output_path = &config.table_paths[0]; + let unbounded_input = config.unbounded_input; + + let (demux_task, mut file_stream_rx) = start_demuxer_task( + data, + context, + None, + base_output_path.clone(), + file_extension, + single_file_output, + ); + + let rb_buffer_size = &context + .session_config() + .options() + .execution + .max_buffered_batches_per_output_file; + + let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(rb_buffer_size / 2); + let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel(); + let write_coordinater_task = tokio::spawn(async move { + stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input) + .await + }); + while let Some((output_location, rb_stream)) = file_stream_rx.recv().await { + let serializer = get_serializer(); + let object_meta = ObjectMeta { + location: output_location, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = create_writer( + FileWriterMode::PutMultipart, + compression, + object_meta.into(), + object_store.clone(), + ) + .await?; + + tx_file_bundle + .send((rb_stream, serializer, writer)) + .await + .map_err(|_| { + DataFusionError::Internal( + "Writer receive file bundle channel closed unexpectedly!".into(), + ) + })?; + } + + // Signal to the write coordinater that no more files are coming + drop(tx_file_bundle); + + let total_count = rx_row_cnt.await.map_err(|_| { + DataFusionError::Internal( + "Did not receieve row count from write coordinater".into(), + ) + })?; + + match try_join!(write_coordinater_task, demux_task) { + Ok((r1, r2)) => { + r1?; + r2?; + } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + + Ok(total_count) +} + +/// Orchestrates append_all for any statelessly serialized file type. Appends to all files provided +/// in a round robin fashion. +pub(crate) async fn stateless_append_all( + mut data: SendableRecordBatchStream, + context: &Arc, + object_store: Arc, + file_groups: &Vec, + unbounded_input: bool, + compression: FileCompressionType, + get_serializer: Box Box + Send>, +) -> Result { + let rb_buffer_size = &context + .session_config() + .options() + .execution + .max_buffered_batches_per_output_file; + + let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(file_groups.len()); + let mut send_channels = vec![]; + for file_group in file_groups { + let serializer = get_serializer(file_group.object_meta.size); + + let file = file_group.clone(); + let writer = create_writer( + FileWriterMode::Append, + compression, + file.object_meta.clone().into(), + object_store.clone(), + ) + .await?; + + let (tx, rx) = tokio::sync::mpsc::channel(rb_buffer_size / 2); + send_channels.push(tx); + tx_file_bundle + .send((rx, serializer, writer)) + .await + .map_err(|_| { + DataFusionError::Internal( + "Writer receive file bundle channel closed unexpectedly!".into(), + ) + })?; + } + + let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel(); + let write_coordinater_task = tokio::spawn(async move { + stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, unbounded_input) + .await + }); + + // Append to file groups in round robin + let mut next_file_idx = 0; + while let Some(rb) = data.next().await.transpose()? { + send_channels[next_file_idx].send(rb).await.map_err(|_| { + DataFusionError::Internal( + "Recordbatch file append stream closed unexpectedly!".into(), + ) + })?; + next_file_idx = (next_file_idx + 1) % send_channels.len(); + if unbounded_input { + tokio::task::yield_now().await; + } + } + // Signal to the write coordinater that no more files are coming + drop(tx_file_bundle); + drop(send_channels); + + let total_count = rx_row_cnt.await.map_err(|_| { + DataFusionError::Internal( + "Did not receieve row count from write coordinater".into(), + ) + })?; + + match try_join!(write_coordinater_task) { + Ok(r1) => { + r1.0?; + } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + + Ok(total_count) } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ca86e3e3c7e1..05d8ba6c451e 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1615,6 +1615,10 @@ mod tests { async fn test_insert_into_append_new_json_files() -> Result<()> { let mut config_map: HashMap = HashMap::new(); config_map.insert("datafusion.execution.batch_size".into(), "1".into()); + config_map.insert( + "datafusion.execution.soft_max_rows_per_output_file".into(), + "1".into(), + ); helper_test_append_new_files_to_table( FileType::JSON, FileCompressionType::UNCOMPRESSED, @@ -1639,6 +1643,10 @@ mod tests { async fn test_insert_into_append_new_csv_files() -> Result<()> { let mut config_map: HashMap = HashMap::new(); config_map.insert("datafusion.execution.batch_size".into(), "1".into()); + config_map.insert( + "datafusion.execution.soft_max_rows_per_output_file".into(), + "1".into(), + ); helper_test_append_new_files_to_table( FileType::CSV, FileCompressionType::UNCOMPRESSED, @@ -1652,6 +1660,10 @@ mod tests { async fn test_insert_into_append_new_parquet_files_defaults() -> Result<()> { let mut config_map: HashMap = HashMap::new(); config_map.insert("datafusion.execution.batch_size".into(), "1".into()); + config_map.insert( + "datafusion.execution.soft_max_rows_per_output_file".into(), + "1".into(), + ); helper_test_append_new_files_to_table( FileType::PARQUET, FileCompressionType::UNCOMPRESSED, @@ -1782,6 +1794,11 @@ mod tests { #[tokio::test] async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> { let mut config_map: HashMap = HashMap::new(); + config_map.insert("datafusion.execution.batch_size".into(), "1".into()); + config_map.insert( + "datafusion.execution.soft_max_rows_per_output_file".into(), + "1".into(), + ); config_map.insert( "datafusion.execution.parquet.compression".into(), "zstd(5)".into(), @@ -2090,7 +2107,6 @@ mod tests { } None => SessionContext::new(), }; - let target_partition_number = session_ctx.state().config().target_partitions(); // Create a new schema with one field called "a" of type Int32 let schema = Arc::new(Schema::new(vec![Field::new( @@ -2230,7 +2246,7 @@ mod tests { // Assert that `target_partition_number` many files were added to the table. let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, target_partition_number); + assert_eq!(num_files, 3); // Create a physical plan from the insert plan let plan = session_ctx @@ -2273,7 +2289,7 @@ mod tests { // Assert that another `target_partition_number` many files were added to the table. let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 2 * target_partition_number); + assert_eq!(num_files, 6); // Return Ok if the function Ok(()) diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index ba99a2b695b6..a2f8e225e121 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -270,7 +270,7 @@ impl DataSink for MemSink { async fn write_all( &self, - mut data: Vec, + mut data: SendableRecordBatchStream, _context: &Arc, ) -> Result { let num_partitions = self.batches.len(); @@ -280,14 +280,10 @@ impl DataSink for MemSink { let mut new_batches = vec![vec![]; num_partitions]; let mut i = 0; let mut row_count = 0; - let num_parts = data.len(); - // TODO parallelize outer and inner loops - for data_part in data.iter_mut().take(num_parts) { - while let Some(batch) = data_part.next().await.transpose()? { - row_count += batch.num_rows(); - new_batches[i].push(batch); - i = (i + 1) % num_partitions; - } + while let Some(batch) = data.next().await.transpose()? { + row_count += batch.num_rows(); + new_batches[i].push(batch); + i = (i + 1) % num_partitions; } // write the outputs into the batches diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index c781ad81c172..e59686453f0e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1976,24 +1976,7 @@ mod tests { ParquetReadOptions::default(), ) .await?; - ctx.register_parquet( - "part1", - &format!("{out_dir}/{write_id}_1.parquet"), - ParquetReadOptions::default(), - ) - .await?; - ctx.register_parquet( - "part2", - &format!("{out_dir}/{write_id}_2.parquet"), - ParquetReadOptions::default(), - ) - .await?; - ctx.register_parquet( - "part3", - &format!("{out_dir}/{write_id}_3.parquet"), - ParquetReadOptions::default(), - ) - .await?; + ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default()) .await?; diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 5b58a0a77134..d1f2706930d2 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -36,7 +36,7 @@ use arrow_array::{ArrayRef, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::{exec_err, internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalSortRequirement; +use datafusion_physical_expr::{Distribution, PhysicalSortRequirement}; use async_trait::async_trait; use futures::StreamExt; @@ -68,7 +68,7 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync { /// or rollback required. async fn write_all( &self, - data: Vec, + data: SendableRecordBatchStream, context: &Arc, ) -> Result; } @@ -152,18 +152,6 @@ impl FileSinkExec { } } - fn execute_all_input_streams( - &self, - context: Arc, - ) -> Result> { - let n_input_parts = self.input.output_partitioning().partition_count(); - let mut streams = Vec::with_capacity(n_input_parts); - for part in 0..n_input_parts { - streams.push(self.execute_input_stream(part, context.clone())?); - } - Ok(streams) - } - /// Returns insert sink pub fn sink(&self) -> &dyn DataSink { self.sink.as_ref() @@ -210,13 +198,17 @@ impl ExecutionPlan for FileSinkExec { } fn benefits_from_input_partitioning(&self) -> Vec { - // Incoming number of partitions is taken to be the - // number of files the query is required to write out. - // The optimizer should not change this number. - // Parrallelism is handled within the appropriate DataSink + // DataSink is responsible for dynamically partitioning its + // own input at execution time. vec![false] } + fn required_input_distribution(&self) -> Vec { + // DataSink is responsible for dynamically partitioning its + // own input at execution time, and so requires a single input partition. + vec![Distribution::SinglePartition; self.children().len()] + } + fn required_input_ordering(&self) -> Vec>> { // The input order is either exlicitly set (such as by a ListingTable), // or require that the [FileSinkExec] gets the data in the order the @@ -269,7 +261,7 @@ impl ExecutionPlan for FileSinkExec { if partition != 0 { return internal_err!("FileSinkExec can only be called on partition 0!"); } - let data = self.execute_all_input_streams(context.clone())?; + let data = self.execute_input_stream(0, context.clone())?; let count_schema = self.count_schema.clone(); let sink = self.sink.clone(); diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index d2ec21488d3d..8e22ad833f7c 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -150,6 +150,8 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false +datafusion.execution.max_buffered_batches_per_output_file 2 +datafusion.execution.max_parallel_ouput_files 8 datafusion.execution.meta_fetch_concurrency 32 datafusion.execution.parquet.allow_single_file_parallelism false datafusion.execution.parquet.bloom_filter_enabled false @@ -175,6 +177,7 @@ datafusion.execution.parquet.statistics_enabled NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 +datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 datafusion.execution.target_partitions 7 @@ -217,6 +220,8 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files +datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption +datafusion.execution.max_parallel_ouput_files 8 This is the maximum number of output files being written in parallel. Higher values can potentially give faster write performance at the cost of higher peak memory consumption. datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics datafusion.execution.parquet.allow_single_file_parallelism false Controls whether DataFusion will attempt to speed up writing large parquet files by first writing multiple smaller files and then stitching them together into a single large file. This will result in faster write speeds, but higher memory usage. Also currently unsupported are bloom filters and column indexes when single_file_parallelism is enabled. datafusion.execution.parquet.bloom_filter_enabled false Sets if bloom filter is enabled for any column @@ -242,6 +247,7 @@ datafusion.execution.parquet.statistics_enabled NULL Sets if statistics are enab datafusion.execution.parquet.write_batch_size 1024 Sets write_batch_size in bytes datafusion.execution.parquet.writer_version 1.0 Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system +datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index 74968bb089d7..cc04c6227721 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -126,13 +126,14 @@ Dml: op=[Insert Into] table=[table_without_values] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan InsertExec: sink=MemoryTable (partitions=1) ---ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] -----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] -------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST] ---------CoalesceBatchesExec: target_batch_size=8192 -----------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 -------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ---------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true +--CoalescePartitionsExec +----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] +------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] +--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST] +----------CoalesceBatchesExec: target_batch_size=8192 +------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 +--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index d1b73204e379..abbfa304be27 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -102,11 +102,18 @@ INSERT INTO single_file_test values (1, 2), (3, 4); ---- 2 +query II +INSERT INTO single_file_test values (4, 5), (6, 7); +---- +2 + query II select * from single_file_test; ---- 1 2 3 4 +4 5 +6 7 statement ok CREATE EXTERNAL TABLE @@ -215,13 +222,14 @@ Dml: op=[Insert Into] table=[table_without_values] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) ---ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] -----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] -------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST] ---------CoalesceBatchesExec: target_batch_size=8192 -----------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 -------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ---------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true +--CoalescePartitionsExec +----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] +------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] +--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST] +----------CoalesceBatchesExec: target_batch_size=8192 +------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 +--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index cab1e5c3e4a9..a0451eed088a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -77,6 +77,9 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | +| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | +| datafusion.execution.max_parallel_ouput_files | 8 | This is the maximum number of output files being written in parallel. Higher values can potentially give faster write performance at the cost of higher peak memory consumption. | +| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |