Skip to content

Add LogicalPlanSignature and use in the optimizer loop #5623

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,5 @@ pub mod test;

pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
pub use utils::optimize_children;

mod plan_signature;
173 changes: 150 additions & 23 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
use crate::filter_null_join_keys::FilterNullJoinKeys;
use crate::inline_table_scan::InlineTableScan;
use crate::merge_projection::MergeProjection;
use crate::plan_signature::LogicalPlanSignature;
use crate::propagate_empty_relation::PropagateEmptyRelation;
use crate::push_down_filter::PushDownFilter;
use crate::push_down_limit::PushDownLimit;
Expand All @@ -47,7 +48,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use log::{debug, trace, warn};
use std::borrow::Cow;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;

Expand Down Expand Up @@ -267,10 +268,14 @@ impl Optimizer {
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let options = config.options();
let analyzed_plan = Analyzer::default().execute_and_check(plan, options)?;
// execute_and_check has it's own timer
let mut new_plan = Analyzer::default().execute_and_check(plan, options)?;

let start_time = Instant::now();
let mut old_plan = Cow::Borrowed(&analyzed_plan);
let mut new_plan = analyzed_plan.clone();

let mut previous_plans = HashSet::with_capacity(16);
previous_plans.insert(LogicalPlanSignature::new(&new_plan));

let mut i = 0;
while i < options.optimizer.max_passes {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
Expand All @@ -280,18 +285,7 @@ impl Optimizer {

match result {
Ok(Some(plan)) => {
if !plan.schema().equivalent_names_and_types(new_plan.schema()) {
let e = DataFusionError::Internal(format!(
"Optimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
rule.name(),
new_plan.schema(),
plan.schema()
));
return Err(DataFusionError::Context(
rule.name().to_string(),
Box::new(e),
));
}
assert_schema_is_the_same(rule.name(), &new_plan, &plan)?;
new_plan = plan;
observer(&new_plan, rule.as_ref());
log_plan(rule.name(), &new_plan);
Expand Down Expand Up @@ -330,15 +324,14 @@ impl Optimizer {
}
log_plan(&format!("Optimized plan (pass {i})"), &new_plan);

// TODO this is an expensive way to see if the optimizer did anything and
// it would be better to change the OptimizerRule trait to return an Option
// instead
if old_plan.as_ref() == &new_plan {
// HashSet::insert returns, whether the value was newly inserted.
let plan_is_fresh =
previous_plans.insert(LogicalPlanSignature::new(&new_plan));
if !plan_is_fresh {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point we detected optimization cycle. Cycles are bad, so we exit. Exiting isn't ideal because our plan is optimized yet. We simply stopped applying rules.

IMO cycle should be error condition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a cycle shows a logical error in the optimizer - because at each step the performance should be improved.

At the same time, it does NOT imply a correctness error. The plan probably will be still correct (but not optimal).

IMO cycle should be error condition

We should take the perspective of a user. If a data scientist does research, should we harm the availability of the database?

IMO Maybe a log of this situation could be useful, with a configuration flag turning this into an error...

@findepi Btw. What motivated you to review this merged (one year ago) PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mslapek thanks for response!

What motivated you to review this merged (one year ago) PR?

I find the source PR as an efficient way to ask a question about the code and reach people with context -- code author(s) and reviewers -- in one shot.

Yes, a cycle shows a logical error in the optimizer - because at each step the performance should be improved.

Agreed!

At the same time, it does NOT imply a correctness error. The plan probably will be still correct (but not optimal).

Agreed!

We should take the perspective of a user. If a data scientist does research, should we harm the availability of the database?

Suboptimal plan can be orders of magnitude more expensive to execute, so allowing it to run may cause unavailability for others, but I see your point. It's especially difficult to transition from bug lenient treatment to more strict. It should be gradual. I like the idea of having this initially controlled with a flag. In tests and on CI this flag should be set to "fail". Then we can switch the flag on for runtime as well.

@alamb @jackwener thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for bringing this up @mslapek and @findepi

Suboptimal plan can be orders of magnitude more expensive to execute, so allowing it to run may cause unavailability for others, but I see your point. It's especially difficult to transition from bug lenient treatment to more strict. It should be gradual. I like the idea of having this initially controlled with a flag. In tests and on CI this flag should be set to "fail". Then we can switch the flag on for runtime as well.

My thoughts are that I can see the tradeoffs with both behavior:

  1. Fail fast (raise an error if cycle is detected) that @findepi increases the liklihood that such an error would be found and fixed quickly (as it would prevent the query in question from running)
  2. Lenient (ignore error) user still gets their answer (though maybe woudl take much longer)

One middle ground might be as @findepi suggests and use a flag -- we could default to raising an error if a cycle was detected but have a way for users to ignore the error and proceed anyways. As long as the error message said how to work around the error I think it would be fine.

In fact we have a similar setting already for failed optimizer rules, for many of the same reasons discussed, that we could model the behavior on datafusion.optimizer.skip_failed_rules: https://datafusion.apache.org/user-guide/configs.html

datafusion.optimizer.skip_failed_rules false When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filed #11285 for this

// plan did not change, so no need to continue trying to optimize
debug!("optimizer pass {} did not make changes", i);
break;
}
old_plan = Cow::Owned(new_plan.clone());
i += 1;
}
log_plan("Final optimized plan", &new_plan);
Expand Down Expand Up @@ -419,6 +412,34 @@ impl Optimizer {
}
}

/// Returns an error if plans have different schemas.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice refactor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Boy Scout Rule: "Leave the campground cleaner than you found it" 🤣

///
/// It ignores metadata and nullability.
fn assert_schema_is_the_same(
rule_name: &str,
prev_plan: &LogicalPlan,
new_plan: &LogicalPlan,
) -> Result<()> {
let equivalent = new_plan
.schema()
.equivalent_names_and_types(prev_plan.schema());

if !equivalent {
let e = DataFusionError::Internal(format!(
"Optimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
rule_name,
prev_plan.schema(),
new_plan.schema()
));
Err(DataFusionError::Context(
String::from(rule_name),
Box::new(e),
))
} else {
Ok(())
}
}

/// Log the plan in debug/tracing mode after some part of the optimizer runs
fn log_plan(description: &str, plan: &LogicalPlan) {
debug!("{description}:\n{}\n", plan.display_indent());
Expand All @@ -432,8 +453,10 @@ mod tests {
use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, Result};
use datafusion_expr::logical_plan::EmptyRelation;
use datafusion_expr::{col, LogicalPlan, LogicalPlanBuilder, Projection};
use std::sync::Arc;
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
use std::sync::{Arc, Mutex};

use super::ApplyOrder;

#[test]
fn skip_failing_rule() {
Expand Down Expand Up @@ -512,6 +535,58 @@ mod tests {
Ok(())
}

#[test]
fn optimizer_detects_plan_equal_to_the_initial() -> Result<()> {
// Run a goofy optimizer, which rotates projection columns
// [1, 2, 3] -> [2, 3, 1] -> [3, 1, 2] -> [1, 2, 3]

let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(false))]);
let config = OptimizerContext::new().with_max_passes(16);

let initial_plan = LogicalPlanBuilder::empty(false)
.project([lit(1), lit(2), lit(3)])?
.project([lit(100)])? // to not trigger changed schema error
.build()?;

let mut plans: Vec<LogicalPlan> = Vec::new();
let final_plan =
opt.optimize(&initial_plan, &config, |p, _| plans.push(p.clone()))?;

// initial_plan is not observed, so we have 3 plans
assert_eq!(3, plans.len());

// we got again the initial_plan with [1, 2, 3]
assert_eq!(initial_plan, final_plan);

Ok(())
}

#[test]
fn optimizer_detects_plan_equal_to_a_non_initial() -> Result<()> {
// Run a goofy optimizer, which reverses and rotates projection columns
// [1, 2, 3] -> [3, 2, 1] -> [2, 1, 3] -> [1, 3, 2] -> [3, 2, 1]

let opt = Optimizer::with_rules(vec![Arc::new(RotateProjectionRule::new(true))]);
let config = OptimizerContext::new().with_max_passes(16);

let initial_plan = LogicalPlanBuilder::empty(false)
.project([lit(1), lit(2), lit(3)])?
.project([lit(100)])? // to not trigger changed schema error
.build()?;

let mut plans: Vec<LogicalPlan> = Vec::new();
let final_plan =
opt.optimize(&initial_plan, &config, |p, _| plans.push(p.clone()))?;

// initial_plan is not observed, so we have 4 plans
assert_eq!(4, plans.len());

// we got again the plan with [3, 2, 1]
assert_eq!(plans[0], final_plan);

Ok(())
}

fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
let new_fields = schema
.fields()
Expand Down Expand Up @@ -569,4 +644,56 @@ mod tests {
"get table_scan rule"
}
}

/// A goofy rule doing rotation of columns in all projections.
///
/// Useful to test cycle detection.
struct RotateProjectionRule {
// reverse exprs instead of rotating on the first pass
reverse_on_first_pass: Mutex<bool>,
}

impl RotateProjectionRule {
fn new(reverse_on_first_pass: bool) -> Self {
Self {
reverse_on_first_pass: Mutex::new(reverse_on_first_pass),
}
}
}

impl OptimizerRule for RotateProjectionRule {
fn try_optimize(
&self,
plan: &LogicalPlan,
_: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let projection = match plan {
LogicalPlan::Projection(p) if p.expr.len() >= 2 => p,
_ => return Ok(None),
};

let mut exprs = projection.expr.clone();

let mut reverse = self.reverse_on_first_pass.lock().unwrap();
if *reverse {
exprs.reverse();
*reverse = false;
} else {
exprs.rotate_left(1);
}

Ok(Some(LogicalPlan::Projection(Projection::try_new(
exprs,
projection.input.clone(),
)?)))
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}

fn name(&self) -> &str {
"rotate_projection"
}
}
}
132 changes: 132 additions & 0 deletions datafusion/optimizer/src/plan_signature.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{
collections::hash_map::DefaultHasher,
convert::Infallible,
hash::{Hash, Hasher},
num::NonZeroUsize,
};

use datafusion_expr::{LogicalPlan, PlanVisitor};

/// Non-unique identifier of a [`LogicalPlan`].
///
/// See [`LogicalPlanSignature::new`] for details.
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct LogicalPlanSignature {
node_number: NonZeroUsize,
plan_hash: u64,
}

impl LogicalPlanSignature {
/// Returns [`LogicalPlanSignature`] of the given [`LogicalPlan`].
///
/// It is a kind of [`LogicalPlan`] hashing with stronger guarantees.
///
/// # Guarantees
///
/// Consider two [`LogicalPlan`]s `p1` and `p2`.
///
/// If `p1` and `p2` have a different number of [`LogicalPlan`]s, then
/// they will have different [`LogicalPlanSignature`]s.
///
/// If `p1` and `p2` have a different [`Hash`], then
/// they will have different [`LogicalPlanSignature`]s.
///
/// # Caveats
///
/// The intention of [`LogicalPlanSignature`] is to have a lower chance
/// of hash collisions.
///
/// There exist different [`LogicalPlan`]s with the same
/// [`LogicalPlanSignature`].
///
/// When two [`LogicalPlan`]s differ only in metadata, then they will have
/// the same [`LogicalPlanSignature`]s (due to hash implementation in
/// [`LogicalPlan`]).
pub fn new(plan: &LogicalPlan) -> Self {
let mut hasher = DefaultHasher::new();
plan.hash(&mut hasher);

Self {
node_number: get_node_number(plan),
plan_hash: hasher.finish(),
}
}
}

/// Get total number of [`LogicalPlan`]s in the plan.
fn get_node_number(plan: &LogicalPlan) -> NonZeroUsize {
struct Visitor {
node_number: usize,
}

impl PlanVisitor for Visitor {
type Error = Infallible;

fn pre_visit(&mut self, _: &LogicalPlan) -> Result<bool, Self::Error> {
self.node_number += 1;
Ok(true)
}
}

let mut v = Visitor { node_number: 0 };
plan.accept(&mut v).unwrap(); // Infallible

// Visitor must have at least visited the root,
// so v.node_number is at least 1.
NonZeroUsize::new(v.node_number).unwrap()
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use datafusion_common::{DFSchema, Result};
use datafusion_expr::{self, lit, LogicalPlan};

use crate::plan_signature::get_node_number;

#[test]
fn node_number_for_some_plan() -> Result<()> {
let schema = Arc::new(DFSchema::empty());

let one_node_plan =
Arc::new(LogicalPlan::EmptyRelation(datafusion_expr::EmptyRelation {
produce_one_row: false,
schema: schema.clone(),
}));

assert_eq!(1, get_node_number(&one_node_plan).get());

let two_node_plan = Arc::new(LogicalPlan::Projection(
datafusion_expr::Projection::try_new(vec![lit(1), lit(2)], one_node_plan)?,
));

assert_eq!(2, get_node_number(&two_node_plan).get());

let five_node_plan = Arc::new(LogicalPlan::Union(datafusion_expr::Union {
inputs: vec![two_node_plan.clone(), two_node_plan],
schema,
}));

assert_eq!(5, get_node_number(&five_node_plan).get());

Ok(())
}
}