Skip to content

Matt/recursive cte eliminate distribution and coalesce in recursive children #2

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Draft
wants to merge 15 commits into
base: matt/recursive-cte
Choose a base branch
from

Conversation

matthewgapp
Copy link
Owner

@matthewgapp matthewgapp commented Sep 21, 2023

This a WIP. It attempts to eliminate unnecessary (and costly) physical plan steps within a recursive query, speeding recursive queries up by ~30x but perhaps more in some cases.

Specifically, it prevents RepartitionExec and CoalesceExec when they'd otherwise be fsfsdecedents of a RecursiveQueryExec. It does, however, allow them to be applied to parts of the plan that aren't under a RecursiveQueryExec.

The PR makes changes to DistributionContext and introduces the context pattern into the Coalesce physical optimizer to determine if a given plan is within a RecursiveQuery.

Example:

  EXPLAIN WITH RECURSIVE nodes(id, region, account, increase) AS (

    SELECT CAST(id as BIGINT) as id, users.region as region, account, 0 as increase
    FROM users

    UNION ALL

    SELECT id + 1 as id, growth.region as region, account + growth.increase as account, growth.increase as increase
    FROM nodes
    JOIN growth ON nodes.region = growth.region

    WHERE id < 10000
)
SELECT id, account, region FROM nodes
WHERE id > 9950

Before

cargo run -r time: 6.5 seconds

+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                           |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: users.id, users.account, region                                                                                                                    |
|               |   Filter: CAST(users.id AS Int64) > Int64(99500)                                                                                                               |
|               |     Projection: users.id, region, users.account                                                                                                                |
|               |       RecursiveQuery: is_distinct=false                                                                                                                        |
|               |         Projection: users.id, users.region AS region, users.account, Int64(0) AS increase                                                                      |
|               |           TableScan: users projection=[id, account, region]                                                                                                    |
|               |         Projection: CAST(users.id AS Int64) + Int64(1) AS id, growth.region AS region, users.account + growth.increase AS account, growth.increase AS increase |
|               |           Inner Join: nodes.region = growth.region                                                                                                             |
|               |             Filter: CAST(users.id AS Int64) < Int64(100000)                                                                                                    |
|               |               Projection: users.id, nodes.region, users.account                                                                                                |
|               |                 NamedRelation: nodes                                                                                                                           |
|               |             TableScan: growth projection=[increase, region]                                                                                                    |
| physical_plan | ProjectionExec: expr=[id@0 as id, account@2 as account, region@1 as region]                                                                                    |
|               |   CoalesceBatchesExec: target_batch_size=8192                                                                                                                  |
|               |     FilterExec: CAST(id@0 AS Int64) > 99500                                                                                                                    |
|               |       RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1                                                                                    |
|               |         ProjectionExec: expr=[id@0 as id, region@1 as region, account@2 as account]                                                                            |
|               |           RecursiveQueryExec: is_distinct=false                                                                                                                |
|               |             ProjectionExec: expr=[id@0 as id, region@2 as region, account@1 as account, 0 as increase]                                                         |
|               |               MemoryExec: partitions=1, partition_sizes=[1]                                                                                                    |
|               |             CoalescePartitionsExec                                                                                                                             |
|               |               ProjectionExec: expr=[CAST(id@0 AS Int64) + 1 as id, region@4 as region, account@2 + increase@3 as account, increase@3 as increase]              |
|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                    |
|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(region@1, region@1)]                                                                   |
|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                |
|               |                       RepartitionExec: partitioning=Hash([region@1], 12), input_partitions=12                                                                  |
|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                            |
|               |                           FilterExec: CAST(id@0 AS Int64) < 100000                                                                                             |
|               |                             RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1                                                              |
|               |                               ProjectionExec: expr=[id@0 as id, region@1 as region, account@2 as account]                                                      |
|               |                                 ContinuanceExec: name=nodes                                                                                                    |
|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                |
|               |                       RepartitionExec: partitioning=Hash([region@1], 12), input_partitions=12                                                                  |
|               |                         RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1                                                                  |
|               |                           MemoryExec: partitions=1, partition_sizes=[1]                                                                                        |
|               |                                                                                                                                                                |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+

After

cargo run -r time: 220ms

+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                            |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: id, users.account, region                                                                                                           |
|               |   Filter: id > Int64(9950)                                                                                                                      |
|               |     Projection: id, region, users.account                                                                                                       |
|               |       RecursiveQuery: is_distinct=false                                                                                                         |
|               |         Projection: CAST(users.id AS Int64) AS id, users.region AS region, users.account, Int64(0) AS increase                                  |
|               |           TableScan: users projection=[id, account, region]                                                                                     |
|               |         Projection: nodes.id + Int64(1) AS id, growth.region AS region, users.account + growth.increase AS account, growth.increase AS increase |
|               |           Inner Join: nodes.region = growth.region                                                                                              |
|               |             Filter: nodes.id < Int64(10000)                                                                                                     |
|               |               Projection: nodes.id, nodes.region, users.account                                                                                 |
|               |                 NamedRelation: nodes                                                                                                            |
|               |             TableScan: growth projection=[increase, region]                                                                                     |
| physical_plan | ProjectionExec: expr=[id@0 as id, account@2 as account, region@1 as region]                                                                     |
|               |   CoalesceBatchesExec: target_batch_size=8192                                                                                                   |
|               |     FilterExec: id@0 > 9950                                                                                                                     |
|               |       RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1                                                                     |
|               |         ProjectionExec: expr=[id@0 as id, region@1 as region, account@2 as account]                                                             |
|               |           RecursiveQueryExec: is_distinct=false                                                                                                 |
|               |             ProjectionExec: expr=[CAST(id@0 AS Int64) as id, region@2 as region, account@1 as account, 0 as increase]                           |
|               |               MemoryExec: partitions=1, partition_sizes=[1]                                                                                     |
|               |             ProjectionExec: expr=[id@0 + 1 as id, region@4 as region, account@2 + increase@3 as account, increase@3 as increase]                |
|               |               HashJoinExec: mode=Partitioned, join_type=Inner, on=[(region@1, region@1)]                                                        |
|               |                 FilterExec: id@0 < 10000                                                                                                        |
|               |                   ProjectionExec: expr=[id@0 as id, region@1 as region, account@2 as account]                                                   |
|               |                     ContinuanceExec: name=nodes                                                                                                 |
|               |                 MemoryExec: partitions=1, partition_sizes=[1]                                                                                   |
|               |                                                                                                                                                 |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------+

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@matthewgapp matthewgapp force-pushed the matt/recursive-cte-eliminate-distribution-and-coalesce-in-recursive-children branch from 81c3439 to b7e59f7 Compare November 18, 2023 22:08
@matthewgapp
Copy link
Owner Author

rebased onto updated (rebased) cte PR. Plan to ready this PR by tomorrow.

@matthewgapp matthewgapp force-pushed the matt/recursive-cte-eliminate-distribution-and-coalesce-in-recursive-children branch from 2461576 to ce3cdcd Compare November 19, 2023 01:12
@matthewgapp matthewgapp force-pushed the matt/recursive-cte branch 2 times, most recently from 7dba5b6 to 219de0c Compare January 9, 2024 06:27
Repository owner deleted a comment from tobarbaro Feb 10, 2024
matthewgapp pushed a commit that referenced this pull request Apr 12, 2024
* refactor `TreeNode::rewrite()`

* use handle_tree_recursion in `Expr`

* use macro for transform recursions

* fix api

* minor fixes

* fix

* don't trust `t.transformed` coming from transformation closures, keep the old way of detecting if changes were made

* rephrase todo comment, always propagate up `t.transformed` from the transformation closure, fix projection pushdown closure

* Fix `TreeNodeRecursion` docs

* extend Skip (Prune) functionality to Jump as it is defined in https://synnada.notion.site/synnada/TreeNode-Design-Proposal-bceac27d18504a2085145550e267c4c1

* fix Jump and add tests

* jump test fixes

* fix clippy

* unify "transform" traversals using macros, fix "visit" traversal jumps, add visit jump tests, ensure consistent naming `f` instead of `op`, `f_down` instead of `pre_visit` and `f_up` instead of `post_visit`

* fix macro rewrite

* minor fixes

* minor fix

* refactor tests

* add transform tests

* add apply, transform_down and transform_up tests

* refactor tests

* test jump on both a and e nodes in both top-down and bottom-up traversals

* better transform/rewrite tests

* minor fix

* simplify tests

* add stop tests, reorganize tests

* fix previous merges and remove leftover file

* Review TreeNode Refactor (#1)

* Minor changes

* Jump doesn't ignore f_up

* update test

* Update rewriter

* LogicalPlan visit update and propagate from children flags

* Update tree_node.rs

* Update map_children's

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>

* fix

* minor fixes

* fix f_up call when f_down returns jump

* simplify code

* minor fix

* revert unnecessary changes

* fix `DynTreeNode` and `ConcreteTreeNode` `transformed` and `tnr` propagation

* introduce TransformedResult helper

* fix docs

* restore transform as alias to trassform_up

* restore transform as alias to trassform_up 2

* Simplifications and comment improvements (#2)

---------

Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>
Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
@@ -59,6 +59,8 @@ use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAg

use itertools::izip;

use super::utils::is_recursive_query;
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey this is a test

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant