Skip to content

Commit 26b1e13

Browse files
alambappletreeisyellow
authored andcommitted
Fix regression with Incorrect results when reading parquet files with different schemas and statistics (#8533)
* Add test for schema evolution * Fix reading parquet statistics * Update tests for fix * Add comments to help explain the test * Add another test
1 parent 2092b49 commit 26b1e13

File tree

3 files changed

+256
-33
lines changed

3 files changed

+256
-33
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,8 +468,10 @@ impl FileOpener for ParquetOpener {
468468
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
469469
.await?;
470470

471+
let file_schema = builder.schema().clone();
472+
471473
let (schema_mapping, adapted_projections) =
472-
schema_adapter.map_schema(builder.schema())?;
474+
schema_adapter.map_schema(&file_schema)?;
473475
// let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?;
474476

475477
let mask = ProjectionMask::roots(
@@ -481,8 +483,8 @@ impl FileOpener for ParquetOpener {
481483
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
482484
let row_filter = row_filter::build_row_filter(
483485
&predicate,
484-
builder.schema().as_ref(),
485-
table_schema.as_ref(),
486+
&file_schema,
487+
&table_schema,
486488
builder.metadata(),
487489
reorder_predicates,
488490
&file_metrics,
@@ -507,6 +509,7 @@ impl FileOpener for ParquetOpener {
507509
let file_metadata = builder.metadata().clone();
508510
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
509511
let mut row_groups = row_groups::prune_row_groups_by_statistics(
512+
&file_schema,
510513
builder.parquet_schema(),
511514
file_metadata.row_groups(),
512515
file_range,

datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs

Lines changed: 110 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use super::ParquetFileMetrics;
5555
/// Note: This method currently ignores ColumnOrder
5656
/// <https://github.com/apache/arrow-datafusion/issues/8335>
5757
pub(crate) fn prune_row_groups_by_statistics(
58+
arrow_schema: &Schema,
5859
parquet_schema: &SchemaDescriptor,
5960
groups: &[RowGroupMetaData],
6061
range: Option<FileRange>,
@@ -80,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics(
8081
let pruning_stats = RowGroupPruningStatistics {
8182
parquet_schema,
8283
row_group_metadata: metadata,
83-
arrow_schema: predicate.schema().as_ref(),
84+
arrow_schema,
8485
};
8586
match predicate.prune(&pruning_stats) {
8687
Ok(values) => {
@@ -415,11 +416,11 @@ mod tests {
415416
fn row_group_pruning_predicate_simple_expr() {
416417
use datafusion_expr::{col, lit};
417418
// int > 1 => c1_max > 1
418-
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
419+
let schema =
420+
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
419421
let expr = col("c1").gt(lit(15));
420422
let expr = logical2physical(&expr, &schema);
421-
let pruning_predicate =
422-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
423+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
423424

424425
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
425426
let schema_descr = get_test_schema_descr(vec![field]);
@@ -435,6 +436,7 @@ mod tests {
435436
let metrics = parquet_file_metrics();
436437
assert_eq!(
437438
prune_row_groups_by_statistics(
439+
&schema,
438440
&schema_descr,
439441
&[rgm1, rgm2],
440442
None,
@@ -449,11 +451,11 @@ mod tests {
449451
fn row_group_pruning_predicate_missing_stats() {
450452
use datafusion_expr::{col, lit};
451453
// int > 1 => c1_max > 1
452-
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
454+
let schema =
455+
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
453456
let expr = col("c1").gt(lit(15));
454457
let expr = logical2physical(&expr, &schema);
455-
let pruning_predicate =
456-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
458+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
457459

458460
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
459461
let schema_descr = get_test_schema_descr(vec![field]);
@@ -470,6 +472,7 @@ mod tests {
470472
// is null / undefined so the first row group can't be filtered out
471473
assert_eq!(
472474
prune_row_groups_by_statistics(
475+
&schema,
473476
&schema_descr,
474477
&[rgm1, rgm2],
475478
None,
@@ -518,6 +521,7 @@ mod tests {
518521
// when conditions are joined using AND
519522
assert_eq!(
520523
prune_row_groups_by_statistics(
524+
&schema,
521525
&schema_descr,
522526
groups,
523527
None,
@@ -531,12 +535,13 @@ mod tests {
531535
// this bypasses the entire predicate expression and no row groups are filtered out
532536
let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
533537
let expr = logical2physical(&expr, &schema);
534-
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
538+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
535539

536540
// if conditions in predicate are joined with OR and an unsupported expression is used
537541
// this bypasses the entire predicate expression and no row groups are filtered out
538542
assert_eq!(
539543
prune_row_groups_by_statistics(
544+
&schema,
540545
&schema_descr,
541546
groups,
542547
None,
@@ -547,6 +552,64 @@ mod tests {
547552
);
548553
}
549554

555+
#[test]
556+
fn row_group_pruning_predicate_file_schema() {
557+
use datafusion_expr::{col, lit};
558+
// test row group predicate when file schema is different than table schema
559+
// c1 > 0
560+
let table_schema = Arc::new(Schema::new(vec![
561+
Field::new("c1", DataType::Int32, false),
562+
Field::new("c2", DataType::Int32, false),
563+
]));
564+
let expr = col("c1").gt(lit(0));
565+
let expr = logical2physical(&expr, &table_schema);
566+
let pruning_predicate =
567+
PruningPredicate::try_new(expr, table_schema.clone()).unwrap();
568+
569+
// Model a file schema's column order c2 then c1, which is the opposite
570+
// of the table schema
571+
let file_schema = Arc::new(Schema::new(vec![
572+
Field::new("c2", DataType::Int32, false),
573+
Field::new("c1", DataType::Int32, false),
574+
]));
575+
let schema_descr = get_test_schema_descr(vec![
576+
PrimitiveTypeField::new("c2", PhysicalType::INT32),
577+
PrimitiveTypeField::new("c1", PhysicalType::INT32),
578+
]);
579+
// rg1 has c2 less than zero, c1 greater than zero
580+
let rgm1 = get_row_group_meta_data(
581+
&schema_descr,
582+
vec![
583+
ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false), // c2
584+
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
585+
],
586+
);
587+
// rg1 has c2 greater than zero, c1 less than zero
588+
let rgm2 = get_row_group_meta_data(
589+
&schema_descr,
590+
vec![
591+
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
592+
ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false),
593+
],
594+
);
595+
596+
let metrics = parquet_file_metrics();
597+
let groups = &[rgm1, rgm2];
598+
// the first row group should be left because c1 is greater than zero
599+
// the second should be filtered out because c1 is less than zero
600+
assert_eq!(
601+
prune_row_groups_by_statistics(
602+
&file_schema, // NB must be file schema, not table_schema
603+
&schema_descr,
604+
groups,
605+
None,
606+
Some(&pruning_predicate),
607+
&metrics
608+
),
609+
vec![0]
610+
);
611+
}
612+
550613
fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
551614
let schema_descr = get_test_schema_descr(vec![
552615
PrimitiveTypeField::new("c1", PhysicalType::INT32),
@@ -580,13 +643,14 @@ mod tests {
580643
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
581644
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
582645
let expr = logical2physical(&expr, &schema);
583-
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
646+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
584647
let groups = gen_row_group_meta_data_for_pruning_predicate();
585648

586649
let metrics = parquet_file_metrics();
587650
// First row group was filtered out because it contains no null value on "c2".
588651
assert_eq!(
589652
prune_row_groups_by_statistics(
653+
&schema,
590654
&schema_descr,
591655
&groups,
592656
None,
@@ -612,14 +676,15 @@ mod tests {
612676
.gt(lit(15))
613677
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
614678
let expr = logical2physical(&expr, &schema);
615-
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
679+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
616680
let groups = gen_row_group_meta_data_for_pruning_predicate();
617681

618682
let metrics = parquet_file_metrics();
619683
// bool = NULL always evaluates to NULL (and thus will not
620684
// pass predicates. Ideally these should both be false
621685
assert_eq!(
622686
prune_row_groups_by_statistics(
687+
&schema,
623688
&schema_descr,
624689
&groups,
625690
None,
@@ -638,8 +703,11 @@ mod tests {
638703

639704
// INT32: c1 > 5, the c1 is decimal(9,2)
640705
// The type of scalar value if decimal(9,2), don't need to do cast
641-
let schema =
642-
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
706+
let schema = Arc::new(Schema::new(vec![Field::new(
707+
"c1",
708+
DataType::Decimal128(9, 2),
709+
false,
710+
)]));
643711
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
644712
.with_logical_type(LogicalType::Decimal {
645713
scale: 2,
@@ -650,8 +718,7 @@ mod tests {
650718
let schema_descr = get_test_schema_descr(vec![field]);
651719
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
652720
let expr = logical2physical(&expr, &schema);
653-
let pruning_predicate =
654-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
721+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
655722
let rgm1 = get_row_group_meta_data(
656723
&schema_descr,
657724
// [1.00, 6.00]
@@ -679,6 +746,7 @@ mod tests {
679746
let metrics = parquet_file_metrics();
680747
assert_eq!(
681748
prune_row_groups_by_statistics(
749+
&schema,
682750
&schema_descr,
683751
&[rgm1, rgm2, rgm3],
684752
None,
@@ -692,8 +760,11 @@ mod tests {
692760
// The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
693761
// We should convert all type to the coercion type, which is decimal(11,2)
694762
// The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
695-
let schema =
696-
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
763+
let schema = Arc::new(Schema::new(vec![Field::new(
764+
"c1",
765+
DataType::Decimal128(9, 0),
766+
false,
767+
)]));
697768

698769
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
699770
.with_logical_type(LogicalType::Decimal {
@@ -708,8 +779,7 @@ mod tests {
708779
Decimal128(11, 2),
709780
));
710781
let expr = logical2physical(&expr, &schema);
711-
let pruning_predicate =
712-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
782+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
713783
let rgm1 = get_row_group_meta_data(
714784
&schema_descr,
715785
// [100, 600]
@@ -743,6 +813,7 @@ mod tests {
743813
let metrics = parquet_file_metrics();
744814
assert_eq!(
745815
prune_row_groups_by_statistics(
816+
&schema,
746817
&schema_descr,
747818
&[rgm1, rgm2, rgm3, rgm4],
748819
None,
@@ -753,8 +824,11 @@ mod tests {
753824
);
754825

755826
// INT64: c1 < 5, the c1 is decimal(18,2)
756-
let schema =
757-
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
827+
let schema = Arc::new(Schema::new(vec![Field::new(
828+
"c1",
829+
DataType::Decimal128(18, 2),
830+
false,
831+
)]));
758832
let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
759833
.with_logical_type(LogicalType::Decimal {
760834
scale: 2,
@@ -765,8 +839,7 @@ mod tests {
765839
let schema_descr = get_test_schema_descr(vec![field]);
766840
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
767841
let expr = logical2physical(&expr, &schema);
768-
let pruning_predicate =
769-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
842+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
770843
let rgm1 = get_row_group_meta_data(
771844
&schema_descr,
772845
// [6.00, 8.00]
@@ -791,6 +864,7 @@ mod tests {
791864
let metrics = parquet_file_metrics();
792865
assert_eq!(
793866
prune_row_groups_by_statistics(
867+
&schema,
794868
&schema_descr,
795869
&[rgm1, rgm2, rgm3],
796870
None,
@@ -802,8 +876,11 @@ mod tests {
802876

803877
// FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
804878
// the type of parquet is decimal(18,2)
805-
let schema =
806-
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
879+
let schema = Arc::new(Schema::new(vec![Field::new(
880+
"c1",
881+
DataType::Decimal128(18, 2),
882+
false,
883+
)]));
807884
let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
808885
.with_logical_type(LogicalType::Decimal {
809886
scale: 2,
@@ -817,8 +894,7 @@ mod tests {
817894
let left = cast(col("c1"), DataType::Decimal128(28, 3));
818895
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
819896
let expr = logical2physical(&expr, &schema);
820-
let pruning_predicate =
821-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
897+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
822898
// we must use the big-endian when encode the i128 to bytes or vec[u8].
823899
let rgm1 = get_row_group_meta_data(
824900
&schema_descr,
@@ -862,6 +938,7 @@ mod tests {
862938
let metrics = parquet_file_metrics();
863939
assert_eq!(
864940
prune_row_groups_by_statistics(
941+
&schema,
865942
&schema_descr,
866943
&[rgm1, rgm2, rgm3],
867944
None,
@@ -873,8 +950,11 @@ mod tests {
873950

874951
// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
875952
// the type of parquet is decimal(18,2)
876-
let schema =
877-
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
953+
let schema = Arc::new(Schema::new(vec![Field::new(
954+
"c1",
955+
DataType::Decimal128(18, 2),
956+
false,
957+
)]));
878958
let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
879959
.with_logical_type(LogicalType::Decimal {
880960
scale: 2,
@@ -888,8 +968,7 @@ mod tests {
888968
let left = cast(col("c1"), DataType::Decimal128(28, 3));
889969
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
890970
let expr = logical2physical(&expr, &schema);
891-
let pruning_predicate =
892-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
971+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
893972
// we must use the big-endian when encode the i128 to bytes or vec[u8].
894973
let rgm1 = get_row_group_meta_data(
895974
&schema_descr,
@@ -922,6 +1001,7 @@ mod tests {
9221001
let metrics = parquet_file_metrics();
9231002
assert_eq!(
9241003
prune_row_groups_by_statistics(
1004+
&schema,
9251005
&schema_descr,
9261006
&[rgm1, rgm2, rgm3],
9271007
None,

0 commit comments

Comments
 (0)