From 92394958ae10b58a7abf51615f0be76cc992dccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20S=C5=82apek?= <28485371+mslapek@users.noreply.github.com> Date: Fri, 17 Mar 2023 20:13:47 +0100 Subject: [PATCH 1/2] Add LogicalPlanSignature and use in the optimizer loop --- datafusion/optimizer/src/lib.rs | 2 + datafusion/optimizer/src/optimizer.rs | 173 ++++++++++++++++++--- datafusion/optimizer/src/plan_signature.rs | 132 ++++++++++++++++ 3 files changed, 284 insertions(+), 23 deletions(-) create mode 100644 datafusion/optimizer/src/plan_signature.rs diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 7f930ae3a8d0..4be7bb370628 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -50,3 +50,5 @@ pub mod test; pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule}; pub use utils::optimize_children; + +mod plan_signature; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 01e945119c46..daa69a9f469e 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -31,6 +31,7 @@ use crate::extract_equijoin_predicate::ExtractEquijoinPredicate; use crate::filter_null_join_keys::FilterNullJoinKeys; use crate::inline_table_scan::InlineTableScan; use crate::merge_projection::MergeProjection; +use crate::plan_signature::LogicalPlanSignature; use crate::propagate_empty_relation::PropagateEmptyRelation; use crate::push_down_filter::PushDownFilter; use crate::push_down_limit::PushDownLimit; @@ -47,7 +48,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use log::{debug, trace, warn}; -use std::borrow::Cow; +use std::collections::HashSet; use std::sync::Arc; use std::time::Instant; @@ -267,10 +268,14 @@ impl Optimizer { F: FnMut(&LogicalPlan, &dyn OptimizerRule), { let options = config.options(); - let analyzed_plan = Analyzer::default().execute_and_check(plan, options)?; + // execute_and_check has it's own timer + let mut new_plan = Analyzer::default().execute_and_check(plan, options)?; + let start_time = Instant::now(); - let mut old_plan = Cow::Borrowed(&analyzed_plan); - let mut new_plan = analyzed_plan.clone(); + + let mut previous_plans = HashSet::with_capacity(16); + previous_plans.insert(LogicalPlanSignature::compute(&new_plan)); + let mut i = 0; while i < options.optimizer.max_passes { log_plan(&format!("Optimizer input (pass {i})"), &new_plan); @@ -280,18 +285,7 @@ impl Optimizer { match result { Ok(Some(plan)) => { - if !plan.schema().equivalent_names_and_types(new_plan.schema()) { - let e = DataFusionError::Internal(format!( - "Optimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}", - rule.name(), - new_plan.schema(), - plan.schema() - )); - return Err(DataFusionError::Context( - rule.name().to_string(), - Box::new(e), - )); - } + assert_schema_is_the_same(rule.name(), &new_plan, &plan)?; new_plan = plan; observer(&new_plan, rule.as_ref()); log_plan(rule.name(), &new_plan); @@ -330,15 +324,14 @@ impl Optimizer { } log_plan(&format!("Optimized plan (pass {i})"), &new_plan); - // TODO this is an expensive way to see if the optimizer did anything and - // it would be better to change the OptimizerRule trait to return an Option - // instead - if old_plan.as_ref() == &new_plan { + // HashSet::insert returns, whether the value was newly inserted. + let plan_is_fresh = + previous_plans.insert(LogicalPlanSignature::compute(&new_plan)); + if !plan_is_fresh { // plan did not change, so no need to continue trying to optimize debug!("optimizer pass {} did not make changes", i); break; } - old_plan = Cow::Owned(new_plan.clone()); i += 1; } log_plan("Final optimized plan", &new_plan); @@ -419,6 +412,34 @@ impl Optimizer { } } +/// Returns an error if plans have different schemas. +/// +/// It ignores metadata and nullability. +fn assert_schema_is_the_same( + rule_name: &str, + prev_plan: &LogicalPlan, + new_plan: &LogicalPlan, +) -> Result<()> { + let equivalent = new_plan + .schema() + .equivalent_names_and_types(prev_plan.schema()); + + if !equivalent { + let e = DataFusionError::Internal(format!( + "Optimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}", + rule_name, + prev_plan.schema(), + new_plan.schema() + )); + Err(DataFusionError::Context( + String::from(rule_name), + Box::new(e), + )) + } else { + Ok(()) + } +} + /// Log the plan in debug/tracing mode after some part of the optimizer runs fn log_plan(description: &str, plan: &LogicalPlan) { debug!("{description}:\n{}\n", plan.display_indent()); @@ -432,8 +453,10 @@ mod tests { use crate::{OptimizerConfig, OptimizerContext, OptimizerRule}; use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, Result}; use datafusion_expr::logical_plan::EmptyRelation; - use datafusion_expr::{col, LogicalPlan, LogicalPlanBuilder, Projection}; - use std::sync::Arc; + use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection}; + use std::sync::{Arc, Mutex}; + + use super::ApplyOrder; #[test] fn skip_failing_rule() { @@ -512,6 +535,58 @@ mod tests { Ok(()) } + #[test] + fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> { + // Run a goofy optimizer, which rotates projection columns + // [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3] + + let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]); + let config = OptimizerContext::new().with_max_passes(16); + + let mut b = LogicalPlanBuilder::empty(false); + b = b.project([lit(1), lit(2), lit(3)])?; + b = b.project([lit(100)])?; // to not trigger changed schema error + let initial_plan = b.build()?; + + let mut plans: Vec = Vec::new(); + let final_plan = + opt.optimize(&initial_plan, &config, |p, _| plans.push(p.clone()))?; + + // initial_plan is not observed, so we have 3 plans + assert_eq!(3, plans.len()); + + // we got again the initial_plan with [1, 2, 3] + assert_eq!(initial_plan, final_plan); + + Ok(()) + } + + #[test] + fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> { + // Run a goofy optimizer, which reverses and rotates projection columns + // [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1] + + let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]); + let config = OptimizerContext::new().with_max_passes(16); + + let mut b = LogicalPlanBuilder::empty(false); + b = b.project([lit(1), lit(2), lit(3)])?; + b = b.project([lit(100)])?; // to not trigger changed schema error + let initial_plan = b.build()?; + + let mut plans: Vec = Vec::new(); + let final_plan = + opt.optimize(&initial_plan, &config, |p, _| plans.push(p.clone()))?; + + // initial_plan is not observed, so we have 4 plans + assert_eq!(4, plans.len()); + + // we got again the plan with [3, 2, 1] + assert_eq!(plans[0], final_plan); + + Ok(()) + } + fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef { let new_fields = schema .fields() @@ -569,4 +644,56 @@ mod tests { "get table_scan rule" } } + + /// A goofy rule doing rotation of columns in all projections. + /// + /// Useful to test cycle detection. + struct RotateProjectionRule { + // reverse exprs instead on rotating on the first pass + reverse_on_first_pass: Mutex, + } + + impl RotateProjectionRule { + fn new(reverse_on_first_pass: bool) -> Self { + Self { + reverse_on_first_pass: Mutex::new(reverse_on_first_pass), + } + } + } + + impl OptimizerRule for RotateProjectionRule { + fn try_optimize( + &self, + plan: &LogicalPlan, + _: &dyn OptimizerConfig, + ) -> Result> { + let projection = match plan { + LogicalPlan::Projection(p) if p.expr.len() >= 2 => p, + _ => return Ok(None), + }; + + let mut exprs = projection.expr.clone(); + + let mut reverse = self.reverse_on_first_pass.lock().unwrap(); + if *reverse { + exprs.reverse(); + *reverse = false; + } else { + exprs.rotate_left(1); + } + + Ok(Some(LogicalPlan::Projection(Projection::try_new( + exprs, + projection.input.clone(), + )?))) + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn name(&self) -> &str { + "rotate_projection" + } + } } diff --git a/datafusion/optimizer/src/plan_signature.rs b/datafusion/optimizer/src/plan_signature.rs new file mode 100644 index 000000000000..77ae79a7f9dd --- /dev/null +++ b/datafusion/optimizer/src/plan_signature.rs @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + collections::hash_map::DefaultHasher, + convert::Infallible, + hash::{Hash, Hasher}, + num::NonZeroUsize, +}; + +use datafusion_expr::{LogicalPlan, PlanVisitor}; + +/// Non-unique identifier of a [`LogicalPlan`]. +/// +/// See [`LogicalPlanSignature::compute`] for details. +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct LogicalPlanSignature { + node_number: NonZeroUsize, + plan_hash: u64, +} + +impl LogicalPlanSignature { + /// Returns [`LogicalPlanSignature`] of the given [`LogicalPlan`]. + /// + /// It is a kind of [`LogicalPlan`] hashing with stronger guarantees. + /// + /// # Guarantees + /// + /// Consider two [`LogicalPlan`]s `p1` and `p2`. + /// + /// If `p1` and `p2` have a different number of [`LogicalPlan`]s, then + /// they will have different [`LogicalPlanSignature`]s. + /// + /// If `p1` and `p2` have a different [`Hash`], then + /// they will have different [`LogicalPlanSignature`]s. + /// + /// # Caveats + /// + /// The intention of [`LogicalPlanSignature`] is to have a lower chance + /// of hash collisions. + /// + /// There exist different [`LogicalPlan`]s with the same + /// [`LogicalPlanSignature`]. + /// + /// When two [`LogicalPlan`]s differ only in metadata, then they will have + /// the same [`LogicalPlanSignature`]s (due to hash implementation in + /// [`LogicalPlan`]). + pub fn compute(plan: &LogicalPlan) -> Self { + let mut hasher = DefaultHasher::new(); + plan.hash(&mut hasher); + + Self { + node_number: get_node_number(plan), + plan_hash: hasher.finish(), + } + } +} + +/// Get total number of [`LogicalPlan`]s in the plan. +fn get_node_number(plan: &LogicalPlan) -> NonZeroUsize { + struct Visitor { + node_number: usize, + } + + impl PlanVisitor for Visitor { + type Error = Infallible; + + fn pre_visit(&mut self, _: &LogicalPlan) -> Result { + self.node_number += 1; + Ok(true) + } + } + + let mut v = Visitor { node_number: 0 }; + plan.accept(&mut v).unwrap(); // Infallible + + // Visitor must have at least visited the root, + // so v.node_number is at least 1. + NonZeroUsize::new(v.node_number).unwrap() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion_common::{DFSchema, Result}; + use datafusion_expr::{self, lit, LogicalPlan}; + + use crate::plan_signature::get_node_number; + + #[test] + fn node_number_for_some_plan() -> Result<()> { + let schema = Arc::new(DFSchema::empty()); + + let one_node_plan = + Arc::new(LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation { + produce_one_row: false, + schema: schema.clone(), + })); + + assert_eq!(1, get_node_number(&one_node_plan).get()); + + let two_node_plan = Arc::new(LogicalPlan::Projection( + datafusion_expr::Projection::try_new(vec![lit(1), lit(2)], one_node_plan)?, + )); + + assert_eq!(2, get_node_number(&two_node_plan).get()); + + let five_node_plan = Arc::new(LogicalPlan::Union(datafusion_expr::Union { + inputs: vec![two_node_plan.clone(), two_node_plan], + schema, + })); + + assert_eq!(5, get_node_number(&five_node_plan).get()); + + Ok(()) + } +} From 3d1d791c28a1b5ae34f9162e290a361a7c1c70bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20S=C5=82apek?= <28485371+mslapek@users.noreply.github.com> Date: Sat, 18 Mar 2023 20:13:47 +0100 Subject: [PATCH 2/2] CR fix --- datafusion/optimizer/src/optimizer.rs | 22 +++++++++++----------- datafusion/optimizer/src/plan_signature.rs | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index daa69a9f469e..35557b1252e4 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -274,7 +274,7 @@ impl Optimizer { let start_time = Instant::now(); let mut previous_plans = HashSet::with_capacity(16); - previous_plans.insert(LogicalPlanSignature::compute(&new_plan)); + previous_plans.insert(LogicalPlanSignature::new(&new_plan)); let mut i = 0; while i < options.optimizer.max_passes { @@ -326,7 +326,7 @@ impl Optimizer { // HashSet::insert returns, whether the value was newly inserted. let plan_is_fresh = - previous_plans.insert(LogicalPlanSignature::compute(&new_plan)); + previous_plans.insert(LogicalPlanSignature::new(&new_plan)); if !plan_is_fresh { // plan did not change, so no need to continue trying to optimize debug!("optimizer pass {} did not make changes", i); @@ -543,10 +543,10 @@ mod tests { let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]); let config = OptimizerContext::new().with_max_passes(16); - let mut b = LogicalPlanBuilder::empty(false); - b = b.project([lit(1), lit(2), lit(3)])?; - b = b.project([lit(100)])?; // to not trigger changed schema error - let initial_plan = b.build()?; + let initial_plan = LogicalPlanBuilder::empty(false) + .project([lit(1), lit(2), lit(3)])? + .project([lit(100)])? // to not trigger changed schema error + .build()?; let mut plans: Vec = Vec::new(); let final_plan = @@ -569,10 +569,10 @@ mod tests { let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]); let config = OptimizerContext::new().with_max_passes(16); - let mut b = LogicalPlanBuilder::empty(false); - b = b.project([lit(1), lit(2), lit(3)])?; - b = b.project([lit(100)])?; // to not trigger changed schema error - let initial_plan = b.build()?; + let initial_plan = LogicalPlanBuilder::empty(false) + .project([lit(1), lit(2), lit(3)])? + .project([lit(100)])? // to not trigger changed schema error + .build()?; let mut plans: Vec = Vec::new(); let final_plan = @@ -649,7 +649,7 @@ mod tests { /// /// Useful to test cycle detection. struct RotateProjectionRule { - // reverse exprs instead on rotating on the first pass + // reverse exprs instead of rotating on the first pass reverse_on_first_pass: Mutex, } diff --git a/datafusion/optimizer/src/plan_signature.rs b/datafusion/optimizer/src/plan_signature.rs index 77ae79a7f9dd..64c06835ad3b 100644 --- a/datafusion/optimizer/src/plan_signature.rs +++ b/datafusion/optimizer/src/plan_signature.rs @@ -26,7 +26,7 @@ use datafusion_expr::{LogicalPlan, PlanVisitor}; /// Non-unique identifier of a [`LogicalPlan`]. /// -/// See [`LogicalPlanSignature::compute`] for details. +/// See [`LogicalPlanSignature::new`] for details. #[derive(Clone, Copy, PartialEq, Eq, Hash)] pub struct LogicalPlanSignature { node_number: NonZeroUsize, @@ -59,7 +59,7 @@ impl LogicalPlanSignature { /// When two [`LogicalPlan`]s differ only in metadata, then they will have /// the same [`LogicalPlanSignature`]s (due to hash implementation in /// [`LogicalPlan`]). - pub fn compute(plan: &LogicalPlan) -> Self { + pub fn new(plan: &LogicalPlan) -> Self { let mut hasher = DefaultHasher::new(); plan.hash(&mut hasher);