Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

perf: Collapse expanded filters in eager #20493

Merged
merged 1 commit into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions crates/polars-plan/src/frame/opt_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,41 @@ impl OptFlags {
pub fn schema_only() -> Self {
Self::TYPE_COERCION | Self::TYPE_CHECK
}

pub fn eager(&self) -> bool {
self.contains(OptFlags::EAGER)
}

pub fn cluster_with_columns(&self) -> bool {
self.contains(OptFlags::CLUSTER_WITH_COLUMNS)
}

pub fn collapse_joins(&self) -> bool {
self.contains(OptFlags::COLLAPSE_JOINS)
}

pub fn predicate_pushdown(&self) -> bool {
self.contains(OptFlags::PREDICATE_PUSHDOWN)
}

pub fn projection_pushdown(&self) -> bool {
self.contains(OptFlags::PROJECTION_PUSHDOWN)
}
pub fn simplify_expr(&self) -> bool {
self.contains(OptFlags::SIMPLIFY_EXPR)
}
pub fn slice_pushdown(&self) -> bool {
self.contains(OptFlags::SLICE_PUSHDOWN)
}
pub fn streaming(&self) -> bool {
self.contains(OptFlags::STREAMING)
}
pub fn new_streaming(&self) -> bool {
self.contains(OptFlags::NEW_STREAMING)
}
pub fn fast_projection(&self) -> bool {
self.contains(OptFlags::FAST_PROJECTION)
}
}

impl Default for OptFlags {
Expand Down
7 changes: 7 additions & 0 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
predicates.push(n)
}
}
let multiple_filters = predicates.len() > 1;

for predicate in predicates {
let predicate = ExprIR::from_node(predicate, ctxt.expr_arena);
Expand All @@ -449,6 +450,12 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
let lp = IR::Filter { input, predicate };
input = run_conversion(lp, ctxt, "filter")?;
}

// Ensure that predicate are combined by optimizer
if ctxt.opt_flags.eager() && multiple_filters {
ctxt.opt_flags.insert(OptFlags::EAGER);
}

Ok(input)
} else {
ctxt.conversion_optimizer
Expand Down
44 changes: 18 additions & 26 deletions crates/polars-plan/src/plans/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,8 @@ pub fn optimize(
}
let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena, &mut opt_state)?;

// get toggle values
let cluster_with_columns = opt_state.contains(OptFlags::CLUSTER_WITH_COLUMNS);
let collapse_joins = opt_state.contains(OptFlags::COLLAPSE_JOINS);
let predicate_pushdown = opt_state.contains(OptFlags::PREDICATE_PUSHDOWN);
let projection_pushdown = opt_state.contains(OptFlags::PROJECTION_PUSHDOWN);
let simplify_expr = opt_state.contains(OptFlags::SIMPLIFY_EXPR);
let slice_pushdown = opt_state.contains(OptFlags::SLICE_PUSHDOWN);
let streaming = opt_state.contains(OptFlags::STREAMING);
let new_streaming = opt_state.contains(OptFlags::NEW_STREAMING);
let fast_projection = opt_state.contains(OptFlags::FAST_PROJECTION);

// Don't run optimizations that don't make sense on a single node.
// This keeps eager execution more snappy.
let eager = opt_state.contains(OptFlags::EAGER);
#[cfg(feature = "cse")]
let comm_subplan_elim = opt_state.contains(OptFlags::COMM_SUBPLAN_ELIM);

Expand All @@ -106,15 +94,16 @@ pub fn optimize(
let comm_subexpr_elim = false;

#[allow(unused_variables)]
let agg_scan_projection = opt_state.contains(OptFlags::FILE_CACHING) && !streaming && !eager;
let agg_scan_projection =
opt_state.contains(OptFlags::FILE_CACHING) && !opt_state.streaming() && !opt_state.eager();

// During debug we check if the optimizations have not modified the final schema.
#[cfg(debug_assertions)]
let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned();

// Collect members for optimizations that need it.
let mut members = MemberCollector::new();
if !eager && (comm_subexpr_elim || projection_pushdown) {
if !opt_state.eager() && (comm_subexpr_elim || opt_state.projection_pushdown()) {
members.collect(lp_top, lp_arena, expr_arena)
}

Expand All @@ -125,7 +114,7 @@ pub fn optimize(
set_order_flags(lp_top, lp_arena, expr_arena, scratch);
}

if simplify_expr {
if opt_state.simplify_expr() {
#[cfg(feature = "fused")]
rules.push(Box::new(fused::FusedArithmetic {}));
}
Expand Down Expand Up @@ -153,7 +142,7 @@ pub fn optimize(
let _cse_plan_changed = false;

// Should be run before predicate pushdown.
if projection_pushdown {
if opt_state.projection_pushdown() {
let mut projection_pushdown_opt = ProjectionPushDown::new();
let alp = lp_arena.take(lp_top);
let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
Expand All @@ -165,33 +154,36 @@ pub fn optimize(
}
}

if predicate_pushdown {
if opt_state.predicate_pushdown() {
let mut predicate_pushdown_opt = PredicatePushDown::new(expr_eval);
let alp = lp_arena.take(lp_top);
let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
lp_arena.replace(lp_top, alp);
}

if cluster_with_columns {
if opt_state.cluster_with_columns() {
cluster_with_columns::optimize(lp_top, lp_arena, expr_arena)
}

// Make sure it is after predicate pushdown
if collapse_joins && members.has_filter_with_join_input {
if opt_state.collapse_joins() && members.has_filter_with_join_input {
collapse_joins::optimize(lp_top, lp_arena, expr_arena)
}

// Make sure its before slice pushdown.
if fast_projection {
rules.push(Box::new(SimpleProjectionAndCollapse::new(eager)));
if opt_state.fast_projection() {
rules.push(Box::new(SimpleProjectionAndCollapse::new(
opt_state.eager(),
)));
}

if !eager {
if !opt_state.eager() {
rules.push(Box::new(DelayRechunk::new()));
}

if slice_pushdown {
let mut slice_pushdown_opt = SlicePushDown::new(streaming, new_streaming);
if opt_state.slice_pushdown() {
let mut slice_pushdown_opt =
SlicePushDown::new(opt_state.streaming(), opt_state.new_streaming());
let alp = lp_arena.take(lp_top);
let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;

Expand All @@ -202,11 +194,11 @@ pub fn optimize(
}
// This optimization removes branches, so we must do it when type coercion
// is completed.
if simplify_expr {
if opt_state.simplify_expr() {
rules.push(Box::new(SimplifyBooleanRule {}));
}

if !eager {
if !opt_state.eager() {
rules.push(Box::new(FlattenUnionRule {}));
}

Expand Down
Loading