From e5bb4aa6b54af39d75a366a29b4447a887071705 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 5 Feb 2024 17:17:02 +0300 Subject: [PATCH 1/3] Consider table scan filter during analysis of optimize projections --- .../optimizer/src/optimize_projections.rs | 76 ++++++++++++++++++- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index d8d7f71d7143..760f40b0cdfb 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -330,6 +330,12 @@ fn optimize_projections( vec![(left_child_indices, true), (right_child_indices, true)] } LogicalPlan::TableScan(table_scan) => { + // Get required field indices by filter expressions + let referred_indices = indices_referred_by_exprs( + &table_scan.projected_schema, + table_scan.filters.iter(), + )?; + let indices = merge_slices(indices, &referred_indices); let schema = table_scan.source.schema(); // Get indices referred to in the original (schema with all fields) // given projected indices. @@ -904,11 +910,13 @@ mod tests { use crate::optimize_projections::OptimizeProjections; use crate::test::{assert_optimized_plan_eq, test_table_scan}; - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::{Result, TableReference}; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use async_trait::async_trait; + use datafusion_common::{DFSchema, Result, TableReference}; use datafusion_expr::{ binary_expr, col, count, lit, logical_plan::builder::LogicalPlanBuilder, not, table_scan, try_cast, when, Expr, Like, LogicalPlan, Operator, + TableProviderFilterPushDown, TableScan, TableSource, TableType, }; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { @@ -1193,4 +1201,68 @@ mod tests { \n TableScan: test projection=[a]"; assert_optimized_plan_equal(&plan, expected) } + + struct PushDownProvider { + pub filter_support: TableProviderFilterPushDown, + } + + #[async_trait] + impl TableSource for PushDownProvider { + fn schema(&self) -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + fn supports_filter_pushdown( + &self, + _e: &Expr, + ) -> Result { + Ok(self.filter_support.clone()) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + } + + fn table_scan_with_pushdown_provider( + filter_support: TableProviderFilterPushDown, + ) -> Result { + let test_provider = PushDownProvider { filter_support }; + + let table_scan = LogicalPlan::TableScan(TableScan { + table_name: "test".into(), + filters: vec![], + projected_schema: Arc::new(DFSchema::try_from( + (*test_provider.schema()).clone(), + )?), + projection: None, + source: Arc::new(test_provider), + fetch: None, + }); + + LogicalPlanBuilder::from(table_scan) + .filter(col("a").eq(lit(1i64)))? + .build() + } + + #[test] + fn filter_with_table_provider_exact() -> Result<()> { + let table_scan = + table_scan_with_pushdown_provider(TableProviderFilterPushDown::Exact)?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![try_cast(col("b"), DataType::Float64)])? + .build()?; + + let expected = "Projection: TRY_CAST(b AS Float64)\ + \n Filter: a = Int64(1)\ + \n TableScan: test projection=[a, b]"; + assert_optimized_plan_equal(&plan, expected) + } } From 576657bdd324261e03d1f47995d694020008ca2a Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 5 Feb 2024 17:31:00 +0300 Subject: [PATCH 2/3] Change test --- .../optimizer/src/optimize_projections.rs | 80 +++++-------------- 1 file changed, 18 insertions(+), 62 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index 760f40b0cdfb..7dda88c78804 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -909,14 +909,15 @@ mod tests { use std::sync::Arc; use crate::optimize_projections::OptimizeProjections; - use crate::test::{assert_optimized_plan_eq, test_table_scan}; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use async_trait::async_trait; - use datafusion_common::{DFSchema, Result, TableReference}; + use crate::test::{ + assert_optimized_plan_eq, test_table_scan, test_table_scan_fields, + }; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::{Result, TableReference}; + use datafusion_expr::builder::table_scan_with_filters; use datafusion_expr::{ binary_expr, col, count, lit, logical_plan::builder::LogicalPlanBuilder, not, table_scan, try_cast, when, Expr, Like, LogicalPlan, Operator, - TableProviderFilterPushDown, TableScan, TableSource, TableType, }; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { @@ -1202,67 +1203,22 @@ mod tests { assert_optimized_plan_equal(&plan, expected) } - struct PushDownProvider { - pub filter_support: TableProviderFilterPushDown, - } - - #[async_trait] - impl TableSource for PushDownProvider { - fn schema(&self) -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ])) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - fn supports_filter_pushdown( - &self, - _e: &Expr, - ) -> Result { - Ok(self.filter_support.clone()) - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } - } - - fn table_scan_with_pushdown_provider( - filter_support: TableProviderFilterPushDown, - ) -> Result { - let test_provider = PushDownProvider { filter_support }; - - let table_scan = LogicalPlan::TableScan(TableScan { - table_name: "test".into(), - filters: vec![], - projected_schema: Arc::new(DFSchema::try_from( - (*test_provider.schema()).clone(), - )?), - projection: None, - source: Arc::new(test_provider), - fetch: None, - }); - - LogicalPlanBuilder::from(table_scan) - .filter(col("a").eq(lit(1i64)))? - .build() - } - #[test] - fn filter_with_table_provider_exact() -> Result<()> { - let table_scan = - table_scan_with_pushdown_provider(TableProviderFilterPushDown::Exact)?; + fn filter_with_table_optimize_projection() -> Result<()> { + let schema = Schema::new(test_table_scan_fields()); + let table_scan = table_scan_with_filters( + Some("test"), + &schema, + None, + vec![col("b").is_not_null()], + )? + .build()?; let plan = LogicalPlanBuilder::from(table_scan) - .project(vec![try_cast(col("b"), DataType::Float64)])? + .project(vec![col("a")])? .build()?; - let expected = "Projection: TRY_CAST(b AS Float64)\ - \n Filter: a = Int64(1)\ - \n TableScan: test projection=[a, b]"; + let expected = "Projection: test.a\ + \n TableScan: test projection=[a, b], full_filters=[b IS NOT NULL]"; assert_optimized_plan_equal(&plan, expected) } } From bc842bcca357cbbf0fe612065d3f3586c922dab6 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 5 Feb 2024 18:27:48 +0300 Subject: [PATCH 3/3] Fix failing test --- datafusion/optimizer/src/push_down_projection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index 6a003ecb5fa8..643dd9efa6e0 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -486,7 +486,7 @@ mod tests { let expected = "\ Projection: Int32(1) AS a\ - \n TableScan: test projection=[], full_filters=[b = Int32(1)]"; + \n TableScan: test projection=[b], full_filters=[b = Int32(1)]"; assert_optimized_plan_eq(&plan, expected) }