Skip to content

Stop copying LogicalPlan and Exprs in DecorrelatePredicateSubquery #10318

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 62 additions & 36 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@ use crate::utils::replace_qualified_name;
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{plan_err, Result};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{internal_err, plan_err, Result};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
use datafusion_expr::utils::{conjunction, split_conjunction};
use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned};
use datafusion_expr::{
exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
LogicalPlan, LogicalPlanBuilder, Operator,
};

use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use log::debug;

/// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins
Expand All @@ -49,6 +50,16 @@ impl DecorrelatePredicateSubquery {
Self::default()
}

fn rewrite_subquery(
&self,
mut subquery: Subquery,
config: &dyn OptimizerConfig,
) -> Result<Subquery> {
subquery.subquery =
Arc::new(self.rewrite(unwrap_arc(subquery.subquery), config)?.data);
Ok(subquery)
}

/// Finds expressions that have the predicate subqueries (and recurses when found)
///
/// # Arguments
Expand All @@ -59,40 +70,32 @@ impl DecorrelatePredicateSubquery {
/// Returns a tuple (subqueries, non-subquery expressions)
fn extract_subquery_exprs(
&self,
predicate: &Expr,
predicate: Expr,
config: &dyn OptimizerConfig,
) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let filters = split_conjunction(predicate); // TODO: add ExistenceJoin to support disjunctions
let filters = split_conjunction_owned(predicate); // TODO: add ExistenceJoin to support disjunctions

let mut subqueries = vec![];
let mut others = vec![];
for it in filters.iter() {
for it in filters.into_iter() {
match it {
Expr::InSubquery(InSubquery {
expr,
subquery,
negated,
}) => {
let subquery_plan = self
.try_optimize(&subquery.subquery, config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let new_subquery = subquery.with_plan(subquery_plan);
let new_subquery = self.rewrite_subquery(subquery, config)?;
subqueries.push(SubqueryInfo::new_with_in_expr(
new_subquery,
(**expr).clone(),
*negated,
*expr,
negated,
));
}
Expr::Exists(Exists { subquery, negated }) => {
let subquery_plan = self
.try_optimize(&subquery.subquery, config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let new_subquery = subquery.with_plan(subquery_plan);
subqueries.push(SubqueryInfo::new(new_subquery, *negated));
let new_subquery = self.rewrite_subquery(subquery, config)?;
subqueries.push(SubqueryInfo::new(new_subquery, negated));
}
_ => others.push((*it).clone()),
expr => others.push(expr),
}
}

Expand All @@ -103,20 +106,46 @@ impl DecorrelatePredicateSubquery {
impl OptimizerRule for DecorrelatePredicateSubquery {
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
internal_err!("Should have called DecorrelatePredicateSubquery::rewrite")
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let LogicalPlan::Filter(filter) = plan else {
return Ok(Transformed::no(plan));
};

// if there are no subqueries in the predicate, return the original plan
let has_subqueries = split_conjunction(&filter.predicate)
.iter()
.any(|expr| matches!(expr, Expr::InSubquery(_) | Expr::Exists(_)));
if !has_subqueries {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the common case where there are no subqueries there will be no rewriting / cloning (which is the same as main

}

let Filter {
predicate, input, ..
} = filter;
let (subqueries, mut other_exprs) =
self.extract_subquery_exprs(&filter.predicate, config)?;
self.extract_subquery_exprs(predicate, config)?;
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
return Ok(None);
return internal_err!(
"can not find expected subqueries in DecorrelatePredicateSubquery"
);
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = filter.input.as_ref().clone();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one avoided clone of the input plan

let mut cur_input = unwrap_arc(input);
for subquery in subqueries {
if let Some(plan) =
build_join(&subquery, &cur_input, config.alias_generator())?
Expand All @@ -129,22 +158,22 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
query,
where_in_expr: Some(expr),
negated: false,
} => in_subquery(expr, query.subquery.clone()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are clones of Arcs so while this PR removes them and it is less cloning realistically it doesn't matter

} => in_subquery(expr, query.subquery),
SubqueryInfo {
query,
where_in_expr: Some(expr),
negated: true,
} => not_in_subquery(expr, query.subquery.clone()),
} => not_in_subquery(expr, query.subquery),
SubqueryInfo {
query,
where_in_expr: None,
negated: false,
} => exists(query.subquery.clone()),
} => exists(query.subquery),
SubqueryInfo {
query,
where_in_expr: None,
negated: true,
} => not_exists(query.subquery.clone()),
} => not_exists(query.subquery),
};
other_exprs.push(sub_query_expr);
}
Expand All @@ -155,10 +184,7 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
cur_input = LogicalPlan::Filter(new_filter);
}
Ok(Some(cur_input))
}
_ => Ok(None),
}
Ok(Transformed::yes(cur_input))
}

fn name(&self) -> &str {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::sync::Arc;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;

use datafusion_common::Result;
use datafusion_common::{internal_err, Result};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the change from #10317 to get the CI to pass

use datafusion_expr::{
expr_vec_fmt, ColumnarValue, FuncMonotonicity, ScalarFunctionDefinition,
};
Expand Down
Loading