Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Support Copy To Partitioned Files #9240

Merged
merged 7 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,31 @@ impl StatementOptions {
maybe_option.map(|(_, v)| v)
}

/// Finds partition_by option if exists and parses into a `Vec<String>`.
/// If option doesn't exist, returns empty `vec![]`.
/// E.g. (partition_by 'colA, colB, colC') -> `vec!['colA','colB','colC']`
pub fn take_partition_by(&mut self) -> Vec<String> {
let partition_by = self.take_str_option("partition_by");
match partition_by {
Some(part_cols) => {
let dequoted = part_cols
.chars()
.enumerate()
.filter(|(idx, c)| {
!((*idx == 0 || *idx == part_cols.len() - 1)
&& (*c == '\'' || *c == '"'))
})
.map(|(_idx, c)| c)
.collect::<String>();
dequoted
.split(',')
.map(|s| s.trim().replace("''", "'"))
.collect::<Vec<_>>()
}
None => vec![],
}
}

/// Infers the file_type given a target and arbitrary options.
/// If the options contain an explicit "format" option, that will be used.
/// Otherwise, attempt to infer file_type from the extension of target.
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ pub struct DataFrameWriteOptions {
/// Allows compression of CSV and JSON.
/// Not supported for parquet.
compression: CompressionTypeVariant,
/// Sets which columns should be used for hive-style partitioned writes by name.
/// Can be set to empty vec![] for non-partitioned writes.
partition_by: Vec<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a test for this new feature DataFrame::write_parquet

I took a look around and I didn't see any good existing tests sadly. This is what I found.

https://github.com/apache/arrow-datafusion/blob/4d389c2590370d85bfe3af77f5243d5b40f5a222/datafusion/core/src/datasource/physical_plan/parquet/mod.rs#L2070

I'll make a short PR to move those tests into the dataframe tests to make it more discoverable

}

impl DataFrameWriteOptions {
Expand All @@ -82,6 +85,7 @@ impl DataFrameWriteOptions {
overwrite: false,
single_file_output: false,
compression: CompressionTypeVariant::UNCOMPRESSED,
partition_by: vec![],
}
}
/// Set the overwrite option to true or false
Expand All @@ -101,6 +105,12 @@ impl DataFrameWriteOptions {
self.compression = compression;
self
}

/// Sets the partition_by columns for output partitioning
pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
self.partition_by = partition_by;
self
}
}

impl Default for DataFrameWriteOptions {
Expand Down Expand Up @@ -1176,6 +1186,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::CSV,
options.partition_by,
copy_options,
)?
.build()?;
Expand Down Expand Up @@ -1219,6 +1230,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::JSON,
options.partition_by,
copy_options,
)?
.build()?;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::PARQUET,
options.partition_by,
copy_options,
)?
.build()?;
Expand Down
28 changes: 17 additions & 11 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow_array::cast::AsArray;
use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArray};
use arrow_schema::{DataType, Schema};
use datafusion_common::cast::as_string_array;
use datafusion_common::DataFusionError;
use datafusion_common::{exec_datafusion_err, DataFusionError};

use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -319,14 +319,20 @@ fn compute_partition_keys_by_row<'a>(
) -> Result<Vec<Vec<&'a str>>> {
let mut all_partition_values = vec![];

for (col, dtype) in partition_by.iter() {
// For the purposes of writing partitioned data, we can rely on schema inference
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// to determine the type of the partition cols in order to provide a more ergonomic
// UI which does not require specifying DataTypes manually. So, we ignore the
// DataType within the partition_by array and infer the correct type from the
// batch schema instead.
let schema = rb.schema();
for (col, _) in partition_by.iter() {
let mut partition_values = vec![];
let col_array =
rb.column_by_name(col)
.ok_or(DataFusionError::Execution(format!(
"PartitionBy Column {} does not exist in source data!",
col
)))?;

let dtype = schema.field_with_name(col)?.data_type();
let col_array = rb.column_by_name(col).ok_or(exec_datafusion_err!(
"PartitionBy Column {} does not exist in source data! Got schema {schema}.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be shortened with exec_datafusion_err!

col
))?;

match dtype {
DataType::Utf8 => {
Expand All @@ -339,12 +345,12 @@ fn compute_partition_keys_by_row<'a>(
downcast_dictionary_array!(
col_array => {
let array = col_array.downcast_dict::<StringArray>()
.ok_or(DataFusionError::Execution(format!("it is not yet supported to write to hive partitions with datatype {}",
dtype)))?;
.ok_or(exec_datafusion_err!("it is not yet supported to write to hive partitions with datatype {}",
dtype))?;

for val in array.values() {
partition_values.push(
val.ok_or(DataFusionError::Execution(format!("Cannot partition by null value for column {}", col)))?
val.ok_or(exec_datafusion_err!("Cannot partition by null value for column {}", col))?
);
}
},
Expand Down
10 changes: 9 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ impl DefaultPhysicalPlanner {
output_url,
file_format,
copy_options,
partition_by,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;
let parsed_url = ListingTableUrl::parse(output_url)?;
Expand All @@ -585,13 +586,20 @@ impl DefaultPhysicalPlanner {
CopyOptions::WriterOptions(writer_options) => *writer_options.clone()
};

// Note: the DataType passed here is ignored for the purposes of writing and inferred instead
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read path needs an explicit DataType defined for the partition cols so it knows what to cast to, but I realized that the write path can just infer the correct DataType from the RecordBatch schema.

This allows COPY to only specify partition columns by name and not have to worry about specifying the correct data type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a better UX -- thank you

// from the schema of the RecordBatch being written. This allows COPY statements to specify only
// the column name rather than column name + explicit data type.
let table_partition_cols = partition_by.iter()
.map(|s| (s.to_string(), arrow_schema::DataType::Null))
.collect::<Vec<_>>();

// Set file sink related options
let config = FileSinkConfig {
object_store_url,
table_paths: vec![parsed_url],
file_groups: vec![],
output_schema: Arc::new(schema),
table_partition_cols: vec![],
table_partition_cols,
overwrite: false,
file_type_writer_options
};
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,14 @@ impl LogicalPlanBuilder {
input: LogicalPlan,
output_url: String,
file_format: FileType,
partition_by: Vec<String>,
copy_options: CopyOptions,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url,
file_format,
partition_by,
copy_options,
})))
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct CopyTo {
pub output_url: String,
/// The file format to output (explicitly defined or inferred from file extension)
pub file_format: FileType,
/// Detmines which, if any, columns should be used for hive-style partitioned writes
pub partition_by: Vec<String>,
/// Arbitrary options as tuples
pub copy_options: CopyOptions,
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,12 +614,13 @@ impl LogicalPlan {
input: _,
output_url,
file_format,
partition_by,
copy_options,
}) => Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(inputs.swap_remove(0)),
output_url: output_url.clone(),
file_format: file_format.clone(),

partition_by: partition_by.clone(),
copy_options: copy_options.clone(),
})),
LogicalPlan::Values(Values { schema, .. }) => {
Expand Down Expand Up @@ -1550,6 +1551,7 @@ impl LogicalPlan {
input: _,
output_url,
file_format,
partition_by: _,
copy_options,
}) => {
let op_str = match copy_options {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ impl AsLogicalPlan for LogicalPlanNode {
input: Arc::new(input),
output_url: copy.output_url.clone(),
file_format: FileType::from_str(&copy.file_type)?,
partition_by: vec![],
copy_options,
},
))
Expand Down Expand Up @@ -1641,6 +1642,7 @@ impl AsLogicalPlan for LogicalPlanNode {
output_url,
file_format,
copy_options,
partition_by: _,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I did not add support for partition_by in proto. We should add a follow up ticket for this.

I don't believe this PR will break downstream systems like Ballista's handling of COPY, but it will silently ignore partition_by options until prost is updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filed #9248

}) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
input,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
input: Arc::new(input),
output_url: "test.csv".to_string(),
file_format: FileType::CSV,
partition_by: vec![],
copy_options: CopyOptions::SQLOptions(StatementOptions::from(&options)),
});

Expand Down Expand Up @@ -354,6 +355,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
input: Arc::new(input),
output_url: "test.parquet".to_string(),
file_format: FileType::PARQUET,
partition_by: vec![],
copy_options: CopyOptions::WriterOptions(Box::new(
FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)),
)),
Expand Down Expand Up @@ -402,6 +404,7 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {
input: Arc::new(input),
output_url: "test.arrow".to_string(),
file_format: FileType::ARROW,
partition_by: vec![],
copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::Arrow(
ArrowWriterOptions::new(),
))),
Expand Down Expand Up @@ -447,6 +450,7 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
input: Arc::new(input),
output_url: "test.csv".to_string(),
file_format: FileType::CSV,
partition_by: vec![],
copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::CSV(
CsvWriterOptions::new(
writer_properties,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,13 +718,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

let mut statement_options = StatementOptions::new(options);
let file_format = statement_options.try_infer_file_type(&statement.target)?;
let partition_by = statement_options.take_partition_by();

let copy_options = CopyOptions::SQLOptions(statement_options);

Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: statement.target,
file_format,
partition_by,
copy_options,
}))
}
Expand Down
84 changes: 84 additions & 0 deletions datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,90 @@ COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compressi
----
2

# Copy to directory as partitioned files
query IT
COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' (format parquet, compression 'zstd(10)', partition_by 'col2');
----
2

# validate multiple partitioned parquet file output
statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table1/' PARTITIONED BY (col2);

query I?
select * from validate_partitioned_parquet order by col1, col2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test for reading out of one of the partitions

Something like

select * from 'test_files/scratch/copy/partitioned_table1/col2=Foo'

To demonstrate that the output was actually partitioned ? I think this test would pass even if the partition columns were ignored

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added additional tests to copy.slt to verify this

----
1 Foo
2 Bar

# validate partition paths were actually generated
statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet_bar STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table1/col2=Bar';

query I
select * from validate_partitioned_parquet_bar order by col1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

----
2

# Copy to directory as partitioned files
query ITT
COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/'
(format parquet, compression 'zstd(10)', partition_by 'column2, column3');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anonymous columns get the name "columnX" based on their order in the VALUES clause. It would be nice to document this somewhere, though I did make sure it is relatively easy to discover this based on the error message if you get a column name wrong.

----
3

# validate multiple partitioned parquet file output
statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet2 STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table2/' PARTITIONED BY (column2, column3);

query I??
select * from validate_partitioned_parquet2 order by column1,column2,column3;
----
1 a x
2 b y
3 c z

statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet_a_x STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table2/column2=a/column3=x';

query I
select * from validate_partitioned_parquet_a_x order by column1;
----
1

statement ok
create table test ("'test'" varchar, "'test2'" varchar, "'test3'" varchar);

query TTT
insert into test VALUES ('a', 'x', 'aa'), ('b','y', 'bb'), ('c', 'z', 'cc')
----
3

query T
select "'test'" from test
----
a
b
c

# Note to place a single ' inside of a literal string escape by putting two ''
query TTT
copy test to 'test_files/scratch/copy/escape_quote' (format csv, partition_by '''test2'',''test3''')
----
3

statement ok
CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV
LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", "'test3'");

# This triggers a panic (index out of bounds)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to query on this table partitioned by a single quote containing column name panics with index out of bounds error.

Manually inspecting the CSV suggests the previous COPY statement worked.

As mentioned in other thread, I'm not sure if it makes sense to support ' in a partition path name. It will certainly get ugly if we try.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally I think we wouldn't panic (perhaps we can generate a not supported error instead)

Given this PR doesn't seem to make the situation worse (or better) I don't think we need to fix it now. Instead I think we should file a ticket to address it as a follow on. I will do so

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#query
#select * from validate_partitioned_escape_quote;

query TT
EXPLAIN COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compression 'zstd(10)');
----
Expand Down
Loading