-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Refactor SortPushdown using the standard top-down visitor and using EquivalenceProperties
#14821
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Conversation
…factor in sort_pushdown_helper to make it more understandable
@@ -2203,7 +2203,7 @@ fn repartition_transitively_past_sort_with_projection() -> Result<()> { | |||
); | |||
|
|||
let expected = &[ | |||
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", | |||
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only test case change.
It's a no-op since we have only 1 partition here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wiedld -- this looks like an improvment -- and thank you for all the comments. I don't fully understand where some of this code is coming from -- I will give it a more careful look in the morning
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Outdated
Show resolved
Hide resolved
let fetch = requirements.data.fetch.or(sort_fetch); | ||
let parent_is_stricter = plan | ||
.equivalence_properties() | ||
.requirements_compatible(&parent_reqs, &child_reqs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where these requirements_compatible
calls come from. Is there an existing location?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The requirements_compatible (docs here) determine if the set of requirements is equal or stricter than the other set of requirements.
On current main, the handling of sort nodes includes the use of pushdown_requirement_to_children which itself uses the requirements_compatible. I simply removed some of the misdirection -- but I could add it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope this is great -- I just was trying to pattern match
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @wiedld -- I went through this logic carefully and I think it makes sense and is much easier to understand now
I am in the process of running the planning benchmarks to make sure these changes don't cause a performance regression, but overall I think they look quite good to me.
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Outdated
Show resolved
Hide resolved
So maybe there is a slight degredation but overall it looks fine to me Full results
|
FYI @xudong963 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you @wiedld. I just have 2 questions
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Outdated
Show resolved
Hide resolved
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Outdated
Show resolved
Hide resolved
There is a new test on main (added 4 hours ago) which is failing. |
Testing for the win. That is a sign this code isn't sufficiently tested 🤔 |
It's failing sort pushdown on this test case:
It tries to push down only the The reason this doesn't happen on main is because they (1) immediately see if they can push down the sort to it's child (without the recursive call to walk down to the child), and then (2) if it cannot be pushed down then re-use the same SortExec node (this branch here), rather than reconstructing from the equivalence properties' ordering (which has constants removed). |
What are the output sort properties of
So then I would expect that the sort properties should be satisfied and this pass should eliminate the SortExec 🤔 |
I think I think the SQL looks like SELECT count() over () But maybe not 🤔 |
5a4ef63
to
4775354
Compare
The equivalence properties' ordering methods, as well as the Do you need me to make that^^ added commit into a separate PR? |
) -> bool { | ||
const_exprs.iter().any(|const_expr| { | ||
const_expr.expr.eq(expr) | ||
&& const_expr.across_partitions() != AcrossPartitions::Heterogeneous |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would find this code more explicit it it checked for == AcrossPartitions::Uniform
rather than != Heterogenious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this (which is how other parts of the code check this):
index 1e1266b7a..e9e7eb2ca 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -223,7 +223,10 @@ pub fn uniform_const_exprs_contains(
) -> bool {
const_exprs.iter().any(|const_expr| {
const_expr.expr.eq(expr)
- && const_expr.across_partitions() != AcrossPartitions::Heterogeneous
+ && matches!(
+ const_expr.across_partitions(),
+ AcrossPartitions::Uniform { .. }
+ )
})
}
Thanks for giving me time to catch up. After digging more into the ordering calculation, and making a small PR with additional test cases (to demonstrate how I think its intended to work?), I now agree with @berkaysynnada. I'm going to undo the ordering calculation change (revert that single commit). Instead, I'm pushing up a temporary commit to show what test cases have a selective removal of the |
I took a look, but couldn't quite get the point you try to emphasize. Do you think that there is a problem in sort_pushdown or compute_properties of window operators?
I'll share my thoughts there
👍🏻 |
I'm still seeing the output_ordering() change which adds the heterogenous constant to the output_ordering, and I have said that doesn't seem correct to me. Did you mean to point out another part in that commit? |
I'm a bit confused about what are we trying to solve and how. Could you @wiedld tell me which issue or reproducer we are trying to solve in this collection of PR's please? I'd like to unblock you but unsure how to do it |
… with no partition by (in window agg), and when aggregating on an unordered column" This reverts commit 2ee747f.
We are trying to fix the coalesce bug. In the process, I've been adding docs and making small improvements to make the code easier to understand.
I've already removed this change.
The goal is to make the enforce sorting optimizer easier to understand, with this PR focused on the |
@@ -2242,7 +2242,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { | |||
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet", | |||
], | |||
expected_plan: vec![ | |||
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]", | |||
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test change is actually ok.
The window agg is unpartitioned, and unbounded. Therefore the count value is constant across all rows; and therefore can be removed from the SortExec
. Therefore we think this change is an improvement.
The reason this plan got better is because the pushdown_sorts
is now recreating the sort from the eq props (that removes the constants), rather than re-attaching the existing sort node (as does main).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are also going to cover the fix in sort pushdown:✅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went over this PR again carefully with @wiedld and we discussed the plan changes and they seem like improvements to me
@berkaysynnada I wonder if you would be interested sometime in a video call or something to discuss these passes and what we are doing. We would be happy to set one up and it might help us be more efficient in the bac and forths
EquivalenceProperties
Of course, we can. My username |
I reached out to find some time |
@berkaysynnada @wiedld and I had a brief meeting. The outcomes from my perspective is:
|
and I'll investigate the temporal bug in release 44 |
I couldn't get this today, sorry. That will be my first priority tomorrow |
No problem. Thank you @berkaysynnada . In the meantime, I'm double checking that you are ok with merging this PR for the refactor of the traversal? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am very sorry for being so late. I have a few questions, and once those are resolved, we can quickly merge this PR. I'll be more available now, so we can iterate faster.
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Outdated
Show resolved
Hide resolved
@@ -2242,7 +2242,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { | |||
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet", | |||
], | |||
expected_plan: vec![ | |||
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]", | |||
"SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are also going to cover the fix in sort pushdown:✅
// Sort col1 DESC | ||
// | ||
// Remove this redundant sort by not pushing down. | ||
let is_orthogonal_sort = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused about this logic. This is inside of an if
condition, which is !satisfy_parent && !parent_is_stricter
evaluates to true
. So, this must be equivalent to
let is_orthogonal_sort = !child_is_stricter;
However, don't we also know that if child is stricter, doesn't it satisfy parent?
So, the overall statement will be
let is_orthogonal_sort = true;
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Orthogonal sort is not the same as strictness.
I added two test cases demonstrating the difference: 843e1f8
After I added the test cases, I could confidently always push down the new child/current requires (rather than conditionally). See that^^ commit.
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Outdated
Show resolved
Hide resolved
BTW tests are passing after applying my suggestions. Do you want me to send a commit or apply it directly by yourself? We can also merge this, and then I can send a follow-up PR, if this is a blocker for you @wiedld |
1c608e4
to
8c1101d
Compare
8c1101d
to
843e1f8
Compare
🚀 thank you @wiedld and @berkaysynnada |
…EquivalenceProperties` (apache#14821) * refactor: have sort pushdown use transform_down, and provide minor refactor in sort_pushdown_helper to make it more understandable * test: inconsequential single change in test * Use consistent variable naming * chore: update variable naming * refactor: only sync the plan children when required * fix: have orderings include constants which are heterogenius across partitions * Revert "fix: have orderings include constants which are heterogenius across partitions" This reverts commit 4775354. * test: temporary commit to demonstrate changes that only occur with no partition by (in window agg), and when aggregating on an unordered column * Revert "test: temporary commit to demonstrate changes that only occur with no partition by (in window agg), and when aggregating on an unordered column" This reverts commit 2ee747f. * chore: cleanup after merging main, for anticipated test change * chore: rename variable * refactor: added test cases for orthogonal sorting, and remove 1 unneeded conditional * chore: remove unneeded conditional and make a comment --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
No issue.
Rationale for this change
It's a minor refactor on the
EnforceSorting
subrulesort_pushdown
. I was having a hard time reasoning and debugging a few things. Switching the traversal to a standard visitor pattern (transform_down
) made it a lot easier for me.What changes are included in this PR?
Move
pushdown_sorts
to use the standard top-down traversal.Only requires a bit of tweaking in the
pushdown_sorts_helper
.Add copious code docs.
Are these changes tested?
Yes, with existing tests.
Are there any user-facing changes?
No.