Skip to content

Implement Hive-Style Partitioned Write Support #7801

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 9 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,6 @@ config_namespace! {
/// number of rows written is not roughly divisible by the soft max
pub soft_max_rows_per_output_file: usize, default = 50000000

/// This is the maximum number of output files being written
/// in parallel. Higher values can potentially give faster write
/// performance at the cost of higher peak memory consumption.
pub max_parallel_ouput_files: usize, default = 8

/// This is the maximum number of RecordBatches buffered
/// for each output file being worked. Higher values can potentially
/// give faster write performance at the cost of higher peak
Expand Down
25 changes: 15 additions & 10 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,18 @@ use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;

use super::write::{stateless_append_all, stateless_multipart_put};
use arrow_array::RecordBatch;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};

use bytes::{Buf, Bytes};
use datafusion_physical_plan::metrics::MetricsSet;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

use super::write::orchestration::{stateless_append_all, stateless_multipart_put};
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
Expand All @@ -39,17 +50,8 @@ use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use arrow_array::RecordBatch;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

/// Character Separated Value `FileFormat` implementation.
#[derive(Debug)]
Expand Down Expand Up @@ -485,6 +487,9 @@ impl CsvSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
if !self.config.table_partition_cols.is_empty() {
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
}
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let (builder, compression) =
(&writer_options.writer_options, &writer_options.compression);
Expand Down
43 changes: 28 additions & 15 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,41 @@ use std::fmt::Debug;
use std::io::BufReader;
use std::sync::Arc;

use super::write::{stateless_append_all, stateless_multipart_put};
use super::{FileFormat, FileScanConfig};
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use arrow::json;
use arrow::json::reader::infer_json_schema_from_iterator;
use arrow::json::reader::ValueIter;
use arrow_array::RecordBatch;
use async_trait::async_trait;
use bytes::Buf;

use bytes::Bytes;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::PhysicalSortRequirement;
use datafusion_physical_plan::ExecutionPlan;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

use crate::datasource::physical_plan::FileGroupDisplay;
use crate::physical_plan::insert::DataSink;
use crate::physical_plan::insert::FileSinkExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

use super::write::orchestration::{stateless_append_all, stateless_multipart_put};

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig, NdJsonExec};
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Statistics,
};

use arrow::datatypes::{Schema, SchemaRef};
use arrow::json;
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
use arrow_array::RecordBatch;
use datafusion_common::{not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug)]
pub struct JsonFormat {
Expand Down Expand Up @@ -258,6 +267,10 @@ impl JsonSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
if !self.config.table_partition_cols.is_empty() {
return Err(DataFusionError::NotImplemented("Inserting in append mode to hive style partitioned tables is not supported".into()));
}

let writer_options = self.config.file_type_writer_options.try_into_json()?;
let compression = &writer_options.compression;

Expand Down
47 changes: 41 additions & 6 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ use std::fmt::Debug;
use std::io::Write;
use std::sync::Arc;

use super::write::{create_writer, start_demuxer_task, AbortableWrite, FileWriterMode};
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, AbortableWrite, FileWriterMode};
use super::{FileFormat, FileScanConfig};

use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
};
use crate::arrow::datatypes::DataType;
use crate::config::ConfigOptions;

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::get_col_stats;
use crate::datasource::physical_plan::{
Expand All @@ -42,8 +47,7 @@ use crate::physical_plan::{
Statistics,
};

use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array};
use arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use arrow::datatypes::{Fields, Schema, SchemaRef};
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -604,6 +608,31 @@ impl ParquetSink {
Self { config }
}

/// Converts table schema to writer schema, which may differ in the case
/// of hive style partitioning where some columns are removed from the
/// underlying files.
fn get_writer_schema(&self) -> Arc<Schema> {
if !self.config.table_partition_cols.is_empty() {
let schema = self.config.output_schema();
let partition_names: Vec<_> = self
.config
.table_partition_cols
.iter()
.map(|(s, _)| s)
.collect();
Arc::new(Schema::new(
schema
.fields()
.iter()
.filter(|f| !partition_names.contains(&f.name()))
.map(|f| (**f).clone())
.collect::<Vec<_>>(),
))
} else {
self.config.output_schema().clone()
}
}

/// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore
/// AsyncArrowWriters are used when individual parquet file serialization is not parallelized
async fn create_async_arrow_writer(
Expand Down Expand Up @@ -631,7 +660,7 @@ impl ParquetSink {
.map_err(DataFusionError::ObjectStore)?;
let writer = AsyncArrowWriter::try_new(
multipart_writer,
self.config.output_schema.clone(),
self.get_writer_schema(),
10485760,
Some(parquet_props),
)?;
Expand Down Expand Up @@ -721,10 +750,16 @@ impl DataSink for ParquetSink {
.map(|r| r as u64);
}

let part_col = if !self.config.table_partition_cols.is_empty() {
Some(self.config.table_partition_cols.clone())
} else {
None
};

let (demux_task, mut file_stream_rx) = start_demuxer_task(
data,
context,
None,
part_col,
self.config.table_paths[0].clone(),
"parquet".into(),
self.config.single_file_output,
Expand Down
Loading