Skip to content

Commit 38d5f75

Browse files
Fix handling of nested leaf columns in parallel parquet writer (#8923)
* fix handling of nested columns * lint * add suggested tests
1 parent 2b218be commit 38d5f75

File tree

6 files changed

+71
-20
lines changed

6 files changed

+71
-20
lines changed

datafusion/common/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ config_namespace! {
408408
/// parquet files by serializing them in parallel. Each column
409409
/// in each row group in each output file are serialized in parallel
410410
/// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
411-
pub allow_single_file_parallelism: bool, default = false
411+
pub allow_single_file_parallelism: bool, default = true
412412

413413
/// By default parallel parquet writer is tuned for minimum
414414
/// memory usage in a streaming execution plan. You may see

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -885,24 +885,25 @@ async fn send_arrays_to_col_writers(
885885
rb: &RecordBatch,
886886
schema: Arc<Schema>,
887887
) -> Result<()> {
888-
for (tx, array, field) in col_array_channels
889-
.iter()
890-
.zip(rb.columns())
891-
.zip(schema.fields())
892-
.map(|((a, b), c)| (a, b, c))
893-
{
888+
// Each leaf column has its own channel, increment next_channel for each leaf column sent.
889+
let mut next_channel = 0;
890+
for (array, field) in rb.columns().iter().zip(schema.fields()) {
894891
for c in compute_leaves(field, array)? {
895-
tx.send(c).await.map_err(|_| {
896-
DataFusionError::Internal("Unable to send array to writer!".into())
897-
})?;
892+
col_array_channels[next_channel]
893+
.send(c)
894+
.await
895+
.map_err(|_| {
896+
DataFusionError::Internal("Unable to send array to writer!".into())
897+
})?;
898+
next_channel += 1;
898899
}
899900
}
900901

901902
Ok(())
902903
}
903904

904905
/// Spawns a tokio task which joins the parallel column writer tasks,
905-
/// and finalizes the row group.
906+
/// and finalizes the row group
906907
fn spawn_rg_join_and_finalize_task(
907908
column_writer_handles: Vec<JoinHandle<Result<ArrowColumnWriter>>>,
908909
rg_rows: usize,

datafusion/sqllogictest/test_files/copy.slt

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,24 @@ select * from validate_parquet;
6464
1 Foo
6565
2 Bar
6666

67+
query ?
68+
copy (values (struct(timestamp '2021-01-01 01:00:01', 1)), (struct(timestamp '2022-01-01 01:00:01', 2)),
69+
(struct(timestamp '2023-01-03 01:00:01', 3)), (struct(timestamp '2024-01-01 01:00:01', 4)))
70+
to 'test_files/scratch/copy/table_nested2' (format parquet, single_file_output false);
71+
----
72+
4
73+
74+
statement ok
75+
CREATE EXTERNAL TABLE validate_parquet_nested2 STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_nested2/';
76+
77+
query ?
78+
select * from validate_parquet_nested2;
79+
----
80+
{c0: 2021-01-01T01:00:01, c1: 1}
81+
{c0: 2022-01-01T01:00:01, c1: 2}
82+
{c0: 2023-01-03T01:00:01, c1: 3}
83+
{c0: 2024-01-01T01:00:01, c1: 4}
84+
6785
query ??
6886
COPY
6987
(values (struct ('foo', (struct ('foo', make_array(struct('a',1), struct('b',2))))), make_array(timestamp '2023-01-01 01:00:01',timestamp '2023-01-01 01:00:01')),
@@ -72,16 +90,48 @@ to 'test_files/scratch/copy/table_nested' (format parquet, single_file_output fa
7290
----
7391
2
7492

75-
# validate multiple parquet file output
7693
statement ok
77-
CREATE EXTERNAL TABLE validate_parquet_nested STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_nested/';
94+
CREATE EXTERNAL TABLE validate_parquet_nested STORED AS PARQUET
95+
LOCATION 'test_files/scratch/copy/table_nested/';
7896

7997
query ??
8098
select * from validate_parquet_nested;
8199
----
82100
{c0: foo, c1: {c0: foo, c1: [{c0: a, c1: 1}, {c0: b, c1: 2}]}} [2023-01-01T01:00:01, 2023-01-01T01:00:01]
83101
{c0: bar, c1: {c0: foo, c1: [{c0: aa, c1: 10}, {c0: bb, c1: 20}]}} [2024-01-01T01:00:01, 2024-01-01T01:00:01]
84102

103+
query ?
104+
copy (values ([struct('foo', 1), struct('bar', 2)]))
105+
to 'test_files/scratch/copy/array_of_struct/'
106+
(format parquet, single_file_output false);
107+
----
108+
1
109+
110+
statement ok
111+
CREATE EXTERNAL TABLE validate_array_of_struct
112+
STORED AS PARQUET LOCATION 'test_files/scratch/copy/array_of_struct/';
113+
114+
query ?
115+
select * from validate_array_of_struct;
116+
----
117+
[{c0: foo, c1: 1}, {c0: bar, c1: 2}]
118+
119+
query ?
120+
copy (values (struct('foo', [1,2,3], struct('bar', [2,3,4]))))
121+
to 'test_files/scratch/copy/struct_with_array/'
122+
(format parquet, single_file_output false);
123+
----
124+
1
125+
126+
statement ok
127+
CREATE EXTERNAL TABLE validate_struct_with_array
128+
STORED AS PARQUET LOCATION 'test_files/scratch/copy/struct_with_array/';
129+
130+
query ?
131+
select * from validate_struct_with_array;
132+
----
133+
{c0: foo, c1: [1, 2, 3], c2: {c0: bar, c1: [2, 3, 4]}}
134+
85135

86136
# Copy parquet with all supported statment overrides
87137
query IT

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ datafusion.execution.listing_table_ignore_subdirectory true
155155
datafusion.execution.max_buffered_batches_per_output_file 2
156156
datafusion.execution.meta_fetch_concurrency 32
157157
datafusion.execution.minimum_parallel_output_files 4
158-
datafusion.execution.parquet.allow_single_file_parallelism false
158+
datafusion.execution.parquet.allow_single_file_parallelism true
159159
datafusion.execution.parquet.bloom_filter_enabled false
160160
datafusion.execution.parquet.bloom_filter_fpp NULL
161161
datafusion.execution.parquet.bloom_filter_ndv NULL
@@ -232,7 +232,7 @@ datafusion.execution.listing_table_ignore_subdirectory true Should sub directori
232232
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
233233
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics
234234
datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached.
235-
datafusion.execution.parquet.allow_single_file_parallelism false Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
235+
datafusion.execution.parquet.allow_single_file_parallelism true Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
236236
datafusion.execution.parquet.bloom_filter_enabled false Sets if bloom filter is enabled for any column
237237
datafusion.execution.parquet.bloom_filter_fpp NULL Sets bloom filter false positive probability. If NULL, uses default parquet writer setting
238238
datafusion.execution.parquet.bloom_filter_ndv NULL Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting

datafusion/sqllogictest/test_files/repartition_scan.slt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ Filter: parquet_table.column1 != Int32(42)
6161
physical_plan
6262
CoalesceBatchesExec: target_batch_size=8192
6363
--FilterExec: column1@0 != 42
64-
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..153], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:153..306], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:306..459], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:459..610]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]
64+
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..104], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:104..208], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:208..312], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:312..414]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]
6565

6666
# disable round robin repartitioning
6767
statement ok
@@ -77,7 +77,7 @@ Filter: parquet_table.column1 != Int32(42)
7777
physical_plan
7878
CoalesceBatchesExec: target_batch_size=8192
7979
--FilterExec: column1@0 != 42
80-
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..153], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:153..306], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:306..459], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:459..610]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]
80+
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..104], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:104..208], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:208..312], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:312..414]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]
8181

8282
# enable round robin repartitioning again
8383
statement ok
@@ -102,7 +102,7 @@ SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
102102
--SortExec: expr=[column1@0 ASC NULLS LAST]
103103
----CoalesceBatchesExec: target_batch_size=8192
104104
------FilterExec: column1@0 != 42
105-
--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:303..601, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..5], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:5..308], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:308..610]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]
105+
--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..205], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:205..405, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..5], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:5..210], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:210..414]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]
106106

107107

108108
## Read the files as though they are ordered
@@ -138,7 +138,7 @@ physical_plan
138138
SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
139139
--CoalesceBatchesExec: target_batch_size=8192
140140
----FilterExec: column1@0 != 42
141-
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..300], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..305], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:305..610], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:300..601]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]
141+
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..207], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:207..414], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:202..405]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]
142142

143143
# Cleanup
144144
statement ok

docs/source/user-guide/configs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
7171
| datafusion.execution.parquet.bloom_filter_enabled | false | Sets if bloom filter is enabled for any column |
7272
| datafusion.execution.parquet.bloom_filter_fpp | NULL | Sets bloom filter false positive probability. If NULL, uses default parquet writer setting |
7373
| datafusion.execution.parquet.bloom_filter_ndv | NULL | Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting |
74-
| datafusion.execution.parquet.allow_single_file_parallelism | false | Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. |
74+
| datafusion.execution.parquet.allow_single_file_parallelism | true | Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. |
7575
| datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. |
7676
| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. |
7777
| datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. |

0 commit comments

Comments
 (0)