From e7e2dcf0254ddb05c40d29796ab6bd53b041de0b Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Fri, 15 Dec 2023 17:10:03 -0500 Subject: [PATCH 1/8] setup parquet.slt and port parquet_query test to it --- datafusion/core/tests/sql/parquet.rs | 26 ----------- .../sqllogictest/test_files/parquet.slt | 46 +++++++++++++++++++ 2 files changed, 46 insertions(+), 26 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/parquet.slt diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index 8f810a929df3..fe3bd36b9530 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -24,32 +24,6 @@ use tempfile::TempDir; use super::*; -#[tokio::test] -async fn parquet_query() { - let ctx = SessionContext::new(); - register_alltypes_parquet(&ctx).await; - // NOTE that string_col is actually a binary column and does not have the UTF8 logical type - // so we need an explicit cast - let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = [ - "+----+---------------------------+", - "| id | alltypes_plain.string_col |", - "+----+---------------------------+", - "| 4 | 0 |", - "| 5 | 1 |", - "| 6 | 0 |", - "| 7 | 1 |", - "| 2 | 0 |", - "| 3 | 1 |", - "| 0 | 0 |", - "| 1 | 1 |", - "+----+---------------------------+", - ]; - - assert_batches_eq!(expected, &actual); -} - #[tokio::test] /// Test that if sort order is specified in ListingOptions, the sort /// expressions make it all the way down to the ParquetExec diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt new file mode 100644 index 000000000000..3683e31f53da --- /dev/null +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +CREATE EXTERNAL TABLE alltypes_plain ( + id INT NOT NULL, + bool_col BOOLEAN NOT NULL, + tinyint_col TINYINT NOT NULL, + smallint_col SMALLINT NOT NULL, + int_col INT NOT NULL, + bigint_col BIGINT NOT NULL, + float_col FLOAT NOT NULL, + double_col DOUBLE NOT NULL, + date_string_col BYTEA NOT NULL, + string_col VARCHAR NOT NULL, + timestamp_col TIMESTAMP NOT NULL, +) +STORED AS PARQUET +WITH HEADER ROW +LOCATION '../../parquet-testing/data/alltypes_plain.parquet' + +query IT +SELECT id, CAST(string_col AS varchar) FROM alltypes_plain +---- +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 From 3b981e25a1fef53eb26ec8520e17b8f1882d6e92 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Fri, 15 Dec 2023 20:11:35 -0500 Subject: [PATCH 2/8] port parquet_with_sort_order_specified, but missing files --- datafusion/core/tests/sql/parquet.rs | 94 ---------------- .../sqllogictest/test_files/parquet.slt | 105 ++++++++++++++++++ 2 files changed, 105 insertions(+), 94 deletions(-) diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index fe3bd36b9530..cb8aec878c24 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -24,100 +24,6 @@ use tempfile::TempDir; use super::*; -#[tokio::test] -/// Test that if sort order is specified in ListingOptions, the sort -/// expressions make it all the way down to the ParquetExec -async fn parquet_with_sort_order_specified() { - let parquet_read_options = ParquetReadOptions::default(); - let session_config = SessionConfig::new().with_target_partitions(2); - - // The sort order is not specified - let options_no_sort = parquet_read_options.to_listing_options(&session_config); - - // The sort order is specified (not actually correct in this case) - let file_sort_order = [col("string_col"), col("int_col")] - .into_iter() - .map(|e| { - let ascending = true; - let nulls_first = false; - e.sort(ascending, nulls_first) - }) - .collect::>(); - - let options_sort = parquet_read_options - .to_listing_options(&session_config) - .with_file_sort_order(vec![file_sort_order]); - - // This string appears in ParquetExec if the output ordering is - // specified - let expected_output_ordering = - "output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST]"; - - // when sort not specified, should not appear in the explain plan - let num_files = 1; - assert_not_contains!( - run_query_with_options(options_no_sort, num_files).await, - expected_output_ordering - ); - - // when sort IS specified, SHOULD appear in the explain plan - let num_files = 1; - assert_contains!( - run_query_with_options(options_sort.clone(), num_files).await, - expected_output_ordering - ); - - // when sort IS specified, but there are too many files (greater - // than the number of partitions) sort should not appear - let num_files = 3; - assert_not_contains!( - run_query_with_options(options_sort, num_files).await, - expected_output_ordering - ); -} - -/// Runs a limit query against a parquet file that was registered from -/// options on num_files copies of all_types_plain.parquet -async fn run_query_with_options(options: ListingOptions, num_files: usize) -> String { - let ctx = SessionContext::new(); - - let testdata = datafusion::test_util::parquet_test_data(); - let file_path = format!("{testdata}/alltypes_plain.parquet"); - - // Create a directory of parquet files with names - // 0.parquet - // 1.parquet - let tmpdir = TempDir::new().unwrap(); - for i in 0..num_files { - let target_file = tmpdir.path().join(format!("{i}.parquet")); - println!("Copying {file_path} to {target_file:?}"); - std::fs::copy(&file_path, target_file).unwrap(); - } - - let provided_schema = None; - let sql_definition = None; - ctx.register_listing_table( - "t", - tmpdir.path().to_string_lossy(), - options.clone(), - provided_schema, - sql_definition, - ) - .await - .unwrap(); - - let batches = ctx.sql("explain select int_col, string_col from t order by string_col, int_col limit 10") - .await - .expect("planing worked") - .collect() - .await - .expect("execution worked"); - - arrow::util::pretty::pretty_format_batches(&batches) - .unwrap() - .to_string() -} - #[tokio::test] async fn fixed_size_binary_columns() { let ctx = SessionContext::new(); diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 3683e31f53da..c0ceba300c7e 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -15,6 +15,10 @@ # specific language governing permissions and limitations # under the License. +# TESTS FOR PARQUET FILES + +# Setup basic alltypes_plain table: + statement ok CREATE EXTERNAL TABLE alltypes_plain ( id INT NOT NULL, @@ -33,6 +37,8 @@ STORED AS PARQUET WITH HEADER ROW LOCATION '../../parquet-testing/data/alltypes_plain.parquet' +# Test a basic query: + query IT SELECT id, CAST(string_col AS varchar) FROM alltypes_plain ---- @@ -44,3 +50,102 @@ SELECT id, CAST(string_col AS varchar) FROM alltypes_plain 3 1 0 0 1 1 + +# Explain query on the un-ordered table, expect no "output_ordering" clause in physical_plan -> ParquetExec: + +query TT +EXPLAIN SELECT int_col, string_col +FROM alltypes_plain +ORDER BY string_col, int_col +LIMIT 10 +---- +logical_plan +Limit: skip=0, fetch=10 +--Sort: alltypes_plain.string_col ASC NULLS LAST, alltypes_plain.int_col ASC NULLS LAST, fetch=10 +----TableScan: alltypes_plain projection=[int_col, string_col] +physical_plan +GlobalLimitExec: skip=0, fetch=10 +--SortExec: TopK(fetch=10), expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] +----ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[int_col, string_col] + +# Setup alltypes_plain, with an order clause: + +statement ok +CREATE EXTERNAL TABLE alltypes_plain_with_order ( + id INT NOT NULL, + bool_col BOOLEAN NOT NULL, + tinyint_col TINYINT NOT NULL, + smallint_col SMALLINT NOT NULL, + int_col INT NOT NULL, + bigint_col BIGINT NOT NULL, + float_col FLOAT NOT NULL, + double_col DOUBLE NOT NULL, + date_string_col BYTEA NOT NULL, + string_col VARCHAR NOT NULL, + timestamp_col TIMESTAMP NOT NULL, +) +STORED AS PARQUET +WITH HEADER ROW +WITH ORDER (string_col ASC NULLS LAST, int_col NULLS LAST) +LOCATION '../../parquet-testing/data/alltypes_plain.parquet' + +# Explain query on the ordered table, expect to see the "output_ordering" clause in physical_plan -> ParquetExec: + +query TT +EXPLAIN SELECT int_col, string_col +FROM alltypes_plain_with_order +ORDER BY string_col, int_col +LIMIT 10 +---- +logical_plan +Limit: skip=0, fetch=10 +--Sort: alltypes_plain_with_order.string_col ASC NULLS LAST, alltypes_plain_with_order.int_col ASC NULLS LAST, fetch=10 +----TableScan: alltypes_plain_with_order projection=[int_col, string_col] +physical_plan +GlobalLimitExec: skip=0, fetch=10 +--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST] + +# Setup alltypes_plain, from the directory, with ordering clause: + +statement ok +CREATE EXTERNAL TABLE alltypes_plain_from_dir ( + id INT NOT NULL, + bool_col BOOLEAN NOT NULL, + tinyint_col TINYINT NOT NULL, + smallint_col SMALLINT NOT NULL, + int_col INT NOT NULL, + bigint_col BIGINT NOT NULL, + float_col FLOAT NOT NULL, + double_col DOUBLE NOT NULL, + date_string_col BYTEA NOT NULL, + string_col VARCHAR NOT NULL, + timestamp_col TIMESTAMP NOT NULL, +) +STORED AS PARQUET +WITH HEADER ROW +WITH ORDER (string_col ASC NULLS LAST, int_col NULLS LAST) +PARTITIONED BY (string_col, int_col) +LOCATION '../../parquet-testing/data/alltypes_dir' + + +# Explain query on ordered table from directory source; the directory contains 5 files, which is one more than +# the hard-coded number of target partitions (4) used by the sqllogictest runner. In this case, we expect the "output_ordering" +# clause to _not_ be present in physical_plan -> ParquetExec, due to there being more files than partitions + +query TT +EXPLAIN SELECT int_col, string_col +FROM alltypes_plain_from_dir +ORDER BY string_col, int_col +LIMIT 10 +---- +logical_plan +Limit: skip=0, fetch=10 +--Sort: alltypes_plain_from_dir.string_col ASC NULLS LAST, alltypes_plain_from_dir.int_col ASC NULLS LAST, fetch=10 +----Projection: alltypes_plain_from_dir.int_col, alltypes_plain_from_dir.string_col +------TableScan: alltypes_plain_from_dir projection=[string_col, int_col] +physical_plan +GlobalLimitExec: skip=0, fetch=10 +--SortPreservingMergeExec: [string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST], fetch=10 +----SortExec: TopK(fetch=10), expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] +------ProjectionExec: expr=[int_col@1 as int_col, string_col@0 as string_col] +--------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/0.parquet, WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/1.parquet], [WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/2.parquet, WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/3.parquet], [WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/4.parquet]]}, projection=[string_col, int_col] From 5916b2786d6ee6d033772da65a5d356454df7d1d Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Sat, 16 Dec 2023 10:51:18 -0500 Subject: [PATCH 3/8] port fixed_size_binary_columns test --- datafusion/core/tests/sql/parquet.rs | 20 -------------- .../sqllogictest/test_files/parquet.slt | 27 +++++++++++++++++++ 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index cb8aec878c24..3caf0984ec82 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -18,31 +18,11 @@ use std::{fs, path::Path}; use ::parquet::arrow::ArrowWriter; -use datafusion::{datasource::listing::ListingOptions, execution::options::ReadOptions}; use datafusion_common::cast::{as_list_array, as_primitive_array, as_string_array}; use tempfile::TempDir; use super::*; -#[tokio::test] -async fn fixed_size_binary_columns() { - let ctx = SessionContext::new(); - ctx.register_parquet( - "t0", - "tests/data/test_binary.parquet", - ParquetReadOptions::default(), - ) - .await - .unwrap(); - let sql = "SELECT ids FROM t0 ORDER BY ids"; - let dataframe = ctx.sql(sql).await.unwrap(); - let results = dataframe.collect().await.unwrap(); - for batch in results { - assert_eq!(466, batch.num_rows()); - assert_eq!(1, batch.num_columns()); - } -} - #[tokio::test] async fn window_fn_timestamp_tz() { let ctx = SessionContext::new(); diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index c0ceba300c7e..62a90a38f0b1 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -149,3 +149,30 @@ GlobalLimitExec: skip=0, fetch=10 ----SortExec: TopK(fetch=10), expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] ------ProjectionExec: expr=[int_col@1 as int_col, string_col@0 as string_col] --------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/0.parquet, WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/1.parquet], [WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/2.parquet, WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/3.parquet], [WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/4.parquet]]}, projection=[string_col, int_col] + +# Perform SELECT on table with fixed sized binary columns + +statement ok +CREATE EXTERNAL TABLE test_binary +STORED AS PARQUET +WITH HEADER ROW +LOCATION '../core/tests/data/test_binary.parquet' + +query ? +SELECT ids FROM test_binary ORDER BY ids LIMIT 10 +---- +008c7196f68089ab692e4739c5fd16b5 +00a51a7bc5ff8eb1627f8f3dc959dce8 +0166ce1d46129ad104fa4990c6057c91 +03a4893f3285b422820b4cd74c9b9786 +04999ac861e14682cd339eae2cc74359 +04b86bf8f228739fde391f850636a77d +050fb9cf722a709eb94b70b3ee7dc342 +052578a65e8e91b8526b182d40e846e8 +05408e6a403e4296526006e20cc4a45a +0592e6fb7d7169b888a4029b53abb701 + +query I +SELECT count(ids) FROM test_binary +---- +466 From 77de3f60a9896a37787963d7783d67cb387b7aac Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Sat, 16 Dec 2023 22:48:10 -0500 Subject: [PATCH 4/8] port window_fn_timestamp_tz test --- datafusion/core/tests/sql/parquet.rs | 33 ----------------- .../sqllogictest/test_files/parquet.slt | 37 ++++++++++++++++++- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index 3caf0984ec82..277441c36584 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -23,39 +23,6 @@ use tempfile::TempDir; use super::*; -#[tokio::test] -async fn window_fn_timestamp_tz() { - let ctx = SessionContext::new(); - ctx.register_parquet( - "t0", - "tests/data/timestamp_with_tz.parquet", - ParquetReadOptions::default(), - ) - .await - .unwrap(); - - let sql = "SELECT count, LAG(timestamp, 1) OVER (ORDER BY timestamp) FROM t0"; - let dataframe = ctx.sql(sql).await.unwrap(); - let results = dataframe.collect().await.unwrap(); - - let mut num_rows = 0; - for batch in results { - num_rows += batch.num_rows(); - assert_eq!(2, batch.num_columns()); - - let ty = batch.column(0).data_type().clone(); - assert_eq!(DataType::Int64, ty); - - let ty = batch.column(1).data_type().clone(); - assert_eq!( - DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), - ty - ); - } - - assert_eq!(131072, num_rows); -} - #[tokio::test] async fn parquet_single_nan_schema() { let ctx = SessionContext::new(); diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 62a90a38f0b1..23cff9d1f48d 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -158,6 +158,13 @@ STORED AS PARQUET WITH HEADER ROW LOCATION '../core/tests/data/test_binary.parquet' +# Check size of table: + +query I +SELECT count(ids) FROM test_binary +---- +466 + query ? SELECT ids FROM test_binary ORDER BY ids LIMIT 10 ---- @@ -172,7 +179,33 @@ SELECT ids FROM test_binary ORDER BY ids LIMIT 10 05408e6a403e4296526006e20cc4a45a 0592e6fb7d7169b888a4029b53abb701 +# Perform a query with a window function and timestamp data: + +statement ok +CREATE EXTERNAL TABLE timestamp_with_tz +STORED AS PARQUET +WITH HEADER ROW +LOCATION '../core/tests/data/timestamp_with_tz.parquet' + +# Check size of table: + query I -SELECT count(ids) FROM test_binary +SELECT COUNT(*) FROM timestamp_with_tz ---- -466 +131072 + +query IP +SELECT count, LAG(timestamp, 1) OVER (ORDER BY timestamp) +FROM timestamp_with_tz +LIMIT 10 +---- +0 NULL +0 2014-08-27T14:00:00Z +0 2014-08-27T14:00:00Z +4 2014-08-27T14:00:00Z +0 2014-08-27T14:00:00Z +0 2014-08-27T14:00:00Z +0 2014-08-27T14:00:00Z +14 2014-08-27T14:00:00Z +0 2014-08-27T14:00:00Z +0 2014-08-27T14:00:00Z From c4fd83054e33544bc2ed87cca3b7ba09c1ab1d5b Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Sat, 16 Dec 2023 23:06:40 -0500 Subject: [PATCH 5/8] port parquet_single_nan_schema test --- datafusion/core/tests/sql/parquet.rs | 20 ------------------- .../sqllogictest/test_files/parquet.slt | 20 +++++++++++++++++++ 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index 277441c36584..f928e220554b 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -23,26 +23,6 @@ use tempfile::TempDir; use super::*; -#[tokio::test] -async fn parquet_single_nan_schema() { - let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); - ctx.register_parquet( - "single_nan", - &format!("{testdata}/single_nan.parquet"), - ParquetReadOptions::default(), - ) - .await - .unwrap(); - let sql = "SELECT mycol FROM single_nan"; - let dataframe = ctx.sql(sql).await.unwrap(); - let results = dataframe.collect().await.unwrap(); - for batch in results { - assert_eq!(1, batch.num_rows()); - assert_eq!(1, batch.num_columns()); - } -} - #[tokio::test] #[ignore = "Test ignored, will be enabled as part of the nested Parquet reader"] async fn parquet_list_columns() { diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 23cff9d1f48d..eb6ee1d5ad1b 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -209,3 +209,23 @@ LIMIT 10 14 2014-08-27T14:00:00Z 0 2014-08-27T14:00:00Z 0 2014-08-27T14:00:00Z + +# Test a query from the single_nan data set: + +statement ok +CREATE EXTERNAL TABLE single_nan +STORED AS PARQUET +WITH HEADER ROW +LOCATION '../../parquet-testing/data/single_nan.parquet' + +# Check table size: + +query I +SELECT COUNT(*) FROM single_nan +---- +1 + +query R +SELECT mycol FROM single_nan +---- +NULL From 5dec84a2e7a4a1a2be0f7348842fb0956f99a1dd Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Sat, 16 Dec 2023 23:42:59 -0500 Subject: [PATCH 6/8] port parquet_query_with_max_min test --- datafusion/core/tests/sql/parquet.rs | 95 ------------------- .../sqllogictest/test_files/parquet.slt | 22 +++++ 2 files changed, 22 insertions(+), 95 deletions(-) diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index f928e220554b..30c49408d2bb 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -93,98 +93,3 @@ async fn parquet_list_columns() { assert_eq!(result.value(2), "hij"); assert_eq!(result.value(3), "xyz"); } - -#[tokio::test] -async fn parquet_query_with_max_min() { - let tmp_dir = TempDir::new().unwrap(); - let table_dir = tmp_dir.path().join("parquet_test"); - let table_path = Path::new(&table_dir); - - let fields = vec![ - Field::new("c1", DataType::Int32, true), - Field::new("c2", DataType::Utf8, true), - Field::new("c3", DataType::Int64, true), - Field::new("c4", DataType::Date32, true), - ]; - - let schema = Arc::new(Schema::new(fields.clone())); - - if let Ok(()) = fs::create_dir(table_path) { - let filename = "foo.parquet"; - let path = table_path.join(filename); - let file = fs::File::create(path).unwrap(); - let mut writer = - ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), None) - .unwrap(); - - // create mock record batch - let c1s = Arc::new(Int32Array::from(vec![1, 2, 3])); - let c2s = Arc::new(StringArray::from(vec!["aaa", "bbb", "ccc"])); - let c3s = Arc::new(Int64Array::from(vec![100, 200, 300])); - let c4s = Arc::new(Date32Array::from(vec![Some(1), Some(2), Some(3)])); - let rec_batch = - RecordBatch::try_new(schema.clone(), vec![c1s, c2s, c3s, c4s]).unwrap(); - - writer.write(&rec_batch).unwrap(); - writer.close().unwrap(); - } - - // query parquet - let ctx = SessionContext::new(); - - ctx.register_parquet( - "foo", - &format!("{}/foo.parquet", table_dir.to_str().unwrap()), - ParquetReadOptions::default(), - ) - .await - .unwrap(); - - let sql = "SELECT max(c1) FROM foo"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = [ - "+-------------+", - "| MAX(foo.c1) |", - "+-------------+", - "| 3 |", - "+-------------+", - ]; - - assert_batches_eq!(expected, &actual); - - let sql = "SELECT min(c2) FROM foo"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = [ - "+-------------+", - "| MIN(foo.c2) |", - "+-------------+", - "| aaa |", - "+-------------+", - ]; - - assert_batches_eq!(expected, &actual); - - let sql = "SELECT max(c3) FROM foo"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = [ - "+-------------+", - "| MAX(foo.c3) |", - "+-------------+", - "| 300 |", - "+-------------+", - ]; - - assert_batches_eq!(expected, &actual); - - let sql = "SELECT min(c4) FROM foo"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = [ - "+-------------+", - "| MIN(foo.c4) |", - "+-------------+", - "| 1970-01-02 |", - "+-------------+", - ]; - - assert_batches_eq!(expected, &actual); -} diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index eb6ee1d5ad1b..1e0d9cc865ab 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -229,3 +229,25 @@ query R SELECT mycol FROM single_nan ---- NULL + +# Perform queries using MIN and MAX + +query I +SELECT max(int_col) FROM alltypes_plain +---- +1 + +query T +SELECT min(string_col) FROM alltypes_plain +---- +0 + +query I +SELECT max(bigint_col) FROM alltypes_plain +---- +10 + +query P +SELECT min(timestamp_col) FROM alltypes_plain +---- +2009-01-01T00:00:00 From 195454d991e86340aa914e5e1b1fea06ebf301cd Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Sun, 17 Dec 2023 17:24:12 -0500 Subject: [PATCH 7/8] use COPY to create tables in parquet.slt to test partitioning over multi-file data --- datafusion/core/tests/sql/parquet.rs | 4 - .../sqllogictest/test_files/parquet.slt | 298 ++++++++++-------- 2 files changed, 175 insertions(+), 127 deletions(-) diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index 30c49408d2bb..f80a28f7e4f9 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -15,11 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{fs, path::Path}; - -use ::parquet::arrow::ArrowWriter; use datafusion_common::cast::{as_list_array, as_primitive_array, as_string_array}; -use tempfile::TempDir; use super::*; diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 1e0d9cc865ab..a0ee3d967b26 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -17,99 +17,170 @@ # TESTS FOR PARQUET FILES -# Setup basic alltypes_plain table: +# Set 2 partitions for deterministic output plans +statement ok +set datafusion.execution.target_partitions = 2; +# Automatically partition files over 1 byte statement ok -CREATE EXTERNAL TABLE alltypes_plain ( - id INT NOT NULL, - bool_col BOOLEAN NOT NULL, - tinyint_col TINYINT NOT NULL, - smallint_col SMALLINT NOT NULL, - int_col INT NOT NULL, - bigint_col BIGINT NOT NULL, - float_col FLOAT NOT NULL, - double_col DOUBLE NOT NULL, - date_string_col BYTEA NOT NULL, - string_col VARCHAR NOT NULL, - timestamp_col TIMESTAMP NOT NULL, -) -STORED AS PARQUET -WITH HEADER ROW -LOCATION '../../parquet-testing/data/alltypes_plain.parquet' +set datafusion.optimizer.repartition_file_min_size = 1 -# Test a basic query: +# Create a table as a data source +statement ok +CREATE TABLE src_table ( + int_col INT, + string_col TEXT, + bigint_col BIGINT, + date_col DATE +) AS VALUES +(1, 'aaa', 100, 1), +(2, 'bbb', 200, 2), +(3, 'ccc', 300, 3), +(4, 'ddd', 400, 4), +(5, 'eee', 500, 5), +(6, 'fff', 600, 6), +(7, 'ggg', 700, 7), +(8, 'hhh', 800, 8), +(9, 'iii', 900, 9); + +# Setup 2 files, i.e., as many as there are partitions: + +# File 1: +query ITID +COPY (SELECT * FROM src_table LIMIT 3) +TO 'test_files/scratch/parquet/test_table/0.parquet' +(FORMAT PARQUET, SINGLE_FILE_OUTPUT true); +---- +3 -query IT -SELECT id, CAST(string_col AS varchar) FROM alltypes_plain +# File 2: +query ITID +COPY (SELECT * FROM src_table WHERE int_col > 3 LIMIT 3) +TO 'test_files/scratch/parquet/test_table/1.parquet' +(FORMAT PARQUET, SINGLE_FILE_OUTPUT true); ---- -4 0 -5 1 -6 0 -7 1 -2 0 -3 1 -0 0 -1 1 +3 -# Explain query on the un-ordered table, expect no "output_ordering" clause in physical_plan -> ParquetExec: +# Create a table from generated parquet files, without ordering: +statement ok +CREATE EXTERNAL TABLE test_table ( + int_col INT, + string_col TEXT, + bigint_col BIGINT, + date_col DATE +) +STORED AS PARQUET +WITH HEADER ROW +LOCATION 'test_files/scratch/parquet/test_table'; +# Basic query: +query ITID +SELECT * FROM test_table ORDER BY int_col; +---- +1 aaa 100 1970-01-02 +2 bbb 200 1970-01-03 +3 ccc 300 1970-01-04 +4 ddd 400 1970-01-05 +5 eee 500 1970-01-06 +6 fff 600 1970-01-07 + +# Check output plan, expect no "output_ordering" clause in the physical_plan -> ParquetExec: query TT EXPLAIN SELECT int_col, string_col -FROM alltypes_plain -ORDER BY string_col, int_col -LIMIT 10 +FROM test_table +ORDER BY string_col, int_col; ---- logical_plan -Limit: skip=0, fetch=10 ---Sort: alltypes_plain.string_col ASC NULLS LAST, alltypes_plain.int_col ASC NULLS LAST, fetch=10 -----TableScan: alltypes_plain projection=[int_col, string_col] +Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST +--TableScan: test_table projection=[int_col, string_col] physical_plan -GlobalLimitExec: skip=0, fetch=10 ---SortExec: TopK(fetch=10), expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] -----ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[int_col, string_col] +SortPreservingMergeExec: [string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] +--SortExec: expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] +----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet]]}, projection=[int_col, string_col] -# Setup alltypes_plain, with an order clause: +# Tear down test_table: +statement ok +DROP TABLE test_table; +# Create test_table again, but with ordering: statement ok -CREATE EXTERNAL TABLE alltypes_plain_with_order ( - id INT NOT NULL, - bool_col BOOLEAN NOT NULL, - tinyint_col TINYINT NOT NULL, - smallint_col SMALLINT NOT NULL, - int_col INT NOT NULL, - bigint_col BIGINT NOT NULL, - float_col FLOAT NOT NULL, - double_col DOUBLE NOT NULL, - date_string_col BYTEA NOT NULL, - string_col VARCHAR NOT NULL, - timestamp_col TIMESTAMP NOT NULL, +CREATE EXTERNAL TABLE test_table ( + int_col INT, + string_col TEXT, + bigint_col BIGINT, + date_col DATE ) STORED AS PARQUET WITH HEADER ROW -WITH ORDER (string_col ASC NULLS LAST, int_col NULLS LAST) -LOCATION '../../parquet-testing/data/alltypes_plain.parquet' +WITH ORDER (string_col ASC NULLS LAST, int_col ASC NULLS LAST) +LOCATION 'test_files/scratch/parquet/test_table'; -# Explain query on the ordered table, expect to see the "output_ordering" clause in physical_plan -> ParquetExec: +# Check output plan, expect an "output_ordering" clause in the physical_plan -> ParquetExec: +query TT +EXPLAIN SELECT int_col, string_col +FROM test_table +ORDER BY string_col, int_col; +---- +logical_plan +Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST +--TableScan: test_table projection=[int_col, string_col] +physical_plan +SortPreservingMergeExec: [string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] +--ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST] + +# Add another file to the directory underlying test_table +query ITID +COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3) +TO 'test_files/scratch/parquet/test_table/2.parquet' +(FORMAT PARQUET, SINGLE_FILE_OUTPUT true); +---- +3 +# Check output plan again, expect no "output_ordering" clause in the physical_plan -> ParquetExec, +# due to there being more files than partitions: query TT EXPLAIN SELECT int_col, string_col -FROM alltypes_plain_with_order -ORDER BY string_col, int_col -LIMIT 10 +FROM test_table +ORDER BY string_col, int_col; ---- logical_plan -Limit: skip=0, fetch=10 ---Sort: alltypes_plain_with_order.string_col ASC NULLS LAST, alltypes_plain_with_order.int_col ASC NULLS LAST, fetch=10 -----TableScan: alltypes_plain_with_order projection=[int_col, string_col] +Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST +--TableScan: test_table projection=[int_col, string_col] physical_plan -GlobalLimitExec: skip=0, fetch=10 ---ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST] +SortPreservingMergeExec: [string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] +--SortExec: expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] +----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col] -# Setup alltypes_plain, from the directory, with ordering clause: +# Perform queries using MIN and MAX +query I +SELECT max(int_col) FROM test_table; +---- +9 + +query T +SELECT min(string_col) FROM test_table; +---- +aaa + +query I +SELECT max(bigint_col) FROM test_table; +---- +900 + +query D +SELECT min(date_col) FROM test_table; +---- +1970-01-02 + +# Clean up +statement ok +DROP TABLE test_table; + +# Setup alltypes_plain table: statement ok -CREATE EXTERNAL TABLE alltypes_plain_from_dir ( - id INT NOT NULL, +CREATE EXTERNAL TABLE alltypes_plain ( + id INT NOT NULL, bool_col BOOLEAN NOT NULL, tinyint_col TINYINT NOT NULL, smallint_col SMALLINT NOT NULL, @@ -123,32 +194,24 @@ CREATE EXTERNAL TABLE alltypes_plain_from_dir ( ) STORED AS PARQUET WITH HEADER ROW -WITH ORDER (string_col ASC NULLS LAST, int_col NULLS LAST) -PARTITIONED BY (string_col, int_col) -LOCATION '../../parquet-testing/data/alltypes_dir' - - -# Explain query on ordered table from directory source; the directory contains 5 files, which is one more than -# the hard-coded number of target partitions (4) used by the sqllogictest runner. In this case, we expect the "output_ordering" -# clause to _not_ be present in physical_plan -> ParquetExec, due to there being more files than partitions +LOCATION '../../parquet-testing/data/alltypes_plain.parquet' -query TT -EXPLAIN SELECT int_col, string_col -FROM alltypes_plain_from_dir -ORDER BY string_col, int_col -LIMIT 10 +# Test a basic query with a CAST: +query IT +SELECT id, CAST(string_col AS varchar) FROM alltypes_plain ---- -logical_plan -Limit: skip=0, fetch=10 ---Sort: alltypes_plain_from_dir.string_col ASC NULLS LAST, alltypes_plain_from_dir.int_col ASC NULLS LAST, fetch=10 -----Projection: alltypes_plain_from_dir.int_col, alltypes_plain_from_dir.string_col -------TableScan: alltypes_plain_from_dir projection=[string_col, int_col] -physical_plan -GlobalLimitExec: skip=0, fetch=10 ---SortPreservingMergeExec: [string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST], fetch=10 -----SortExec: TopK(fetch=10), expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] -------ProjectionExec: expr=[int_col@1 as int_col, string_col@0 as string_col] ---------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/0.parquet, WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/1.parquet], [WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/2.parquet, WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/3.parquet], [WORKSPACE_ROOT/parquet-testing/data/alltypes_dir/4.parquet]]}, projection=[string_col, int_col] +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 + +# Clean up +statement ok +DROP TABLE alltypes_plain; # Perform SELECT on table with fixed sized binary columns @@ -156,17 +219,17 @@ statement ok CREATE EXTERNAL TABLE test_binary STORED AS PARQUET WITH HEADER ROW -LOCATION '../core/tests/data/test_binary.parquet' +LOCATION '../core/tests/data/test_binary.parquet'; # Check size of table: - query I -SELECT count(ids) FROM test_binary +SELECT count(ids) FROM test_binary; ---- 466 +# Do the SELECT query: query ? -SELECT ids FROM test_binary ORDER BY ids LIMIT 10 +SELECT ids FROM test_binary ORDER BY ids LIMIT 10; ---- 008c7196f68089ab692e4739c5fd16b5 00a51a7bc5ff8eb1627f8f3dc959dce8 @@ -179,25 +242,29 @@ SELECT ids FROM test_binary ORDER BY ids LIMIT 10 05408e6a403e4296526006e20cc4a45a 0592e6fb7d7169b888a4029b53abb701 +# Clean up +statement ok +DROP TABLE test_binary; + # Perform a query with a window function and timestamp data: statement ok CREATE EXTERNAL TABLE timestamp_with_tz STORED AS PARQUET WITH HEADER ROW -LOCATION '../core/tests/data/timestamp_with_tz.parquet' +LOCATION '../core/tests/data/timestamp_with_tz.parquet'; # Check size of table: - query I -SELECT COUNT(*) FROM timestamp_with_tz +SELECT COUNT(*) FROM timestamp_with_tz; ---- 131072 +# Perform the query: query IP SELECT count, LAG(timestamp, 1) OVER (ORDER BY timestamp) FROM timestamp_with_tz -LIMIT 10 +LIMIT 10; ---- 0 NULL 0 2014-08-27T14:00:00Z @@ -210,44 +277,29 @@ LIMIT 10 0 2014-08-27T14:00:00Z 0 2014-08-27T14:00:00Z -# Test a query from the single_nan data set: +# Clean up +statement ok +DROP TABLE timestamp_with_tz; +# Test a query from the single_nan data set: statement ok CREATE EXTERNAL TABLE single_nan STORED AS PARQUET WITH HEADER ROW -LOCATION '../../parquet-testing/data/single_nan.parquet' +LOCATION '../../parquet-testing/data/single_nan.parquet'; # Check table size: - query I -SELECT COUNT(*) FROM single_nan +SELECT COUNT(*) FROM single_nan; ---- 1 +# Query for the single NULL: query R -SELECT mycol FROM single_nan +SELECT mycol FROM single_nan; ---- NULL -# Perform queries using MIN and MAX - -query I -SELECT max(int_col) FROM alltypes_plain ----- -1 - -query T -SELECT min(string_col) FROM alltypes_plain ----- -0 - -query I -SELECT max(bigint_col) FROM alltypes_plain ----- -10 - -query P -SELECT min(timestamp_col) FROM alltypes_plain ----- -2009-01-01T00:00:00 +# Clean up +statement ok +DROP TABLE single_nan; From c47f4e44cdfbe491592b4ec1dd80df037d62a86a Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Mon, 18 Dec 2023 20:46:39 -0500 Subject: [PATCH 8/8] remove unneeded optimizer setting; check type of timestamp column --- .../sqllogictest/test_files/parquet.slt | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index a0ee3d967b26..bbe7f33e260c 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -21,10 +21,6 @@ statement ok set datafusion.execution.target_partitions = 2; -# Automatically partition files over 1 byte -statement ok -set datafusion.optimizer.repartition_file_min_size = 1 - # Create a table as a data source statement ok CREATE TABLE src_table ( @@ -261,21 +257,24 @@ SELECT COUNT(*) FROM timestamp_with_tz; 131072 # Perform the query: -query IP -SELECT count, LAG(timestamp, 1) OVER (ORDER BY timestamp) +query IPT +SELECT + count, + LAG(timestamp, 1) OVER (ORDER BY timestamp), + arrow_typeof(LAG(timestamp, 1) OVER (ORDER BY timestamp)) FROM timestamp_with_tz LIMIT 10; ---- -0 NULL -0 2014-08-27T14:00:00Z -0 2014-08-27T14:00:00Z -4 2014-08-27T14:00:00Z -0 2014-08-27T14:00:00Z -0 2014-08-27T14:00:00Z -0 2014-08-27T14:00:00Z -14 2014-08-27T14:00:00Z -0 2014-08-27T14:00:00Z -0 2014-08-27T14:00:00Z +0 NULL Timestamp(Millisecond, Some("UTC")) +0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +4 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +14 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) # Clean up statement ok