Skip to content

fix: fetch is missed during EnforceDistribution #14207

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
wants to merge 7 commits into from

Conversation

xudong963
Copy link
Member

@xudong963 xudong963 commented Jan 20, 2025

Which issue does this PR close?

Rationale for this change

The root of the bug is that if there is a SortPreservingMergeExec operator and it has the fetch, the method(remove_dist_changing_operators ) will remove it directly, and miss the fetch. So it's definitely a bug.

IIUC, SortPreservingMergeExec only will be added after EnforceSorting, so if we want to reproduce the bug, EnforceDistribution needs to run after EnforceSorting, the default physical optimizer has the order of rules: EnforceDistribution -> EnforceSorting, so it won't trigger the bug(fetch is missed), but if we run EnforceDistribution again after EnforceSorting, the bug will surface.

What the PR is doing is that it will remain the fetch of SortPreservingMergeExec and add it back if necessary, as the comments of the method said: If they are necessary, they will be added in subsequent stages.

What changes are included in this PR?

Avoid missing fetch during EnforceDistribution.

Are these changes tested?

YES

Are there any user-facing changes?

NO

@xudong963 xudong963 marked this pull request as draft January 20, 2025 10:27
@github-actions github-actions bot added the core Core DataFusion crate label Jan 20, 2025
@xudong963 xudong963 force-pushed the fix_enforce_distribution branch from bbc28fd to ce29d78 Compare January 21, 2025 06:25
@xudong963 xudong963 marked this pull request as ready for review January 21, 2025 06:26
@xudong963 xudong963 force-pushed the fix_enforce_distribution branch from 0ae365c to a276491 Compare January 23, 2025 01:27
@github-actions github-actions bot added the optimizer Optimizer rules label Jan 23, 2025
@xudong963 xudong963 force-pushed the fix_enforce_distribution branch from a276491 to 6f40f4a Compare January 23, 2025 01:30
@berkaysynnada
Copy link
Contributor

This week I couldn't spare time to review this fix, sorry @xudong963. That will be one of my priorities in the next week.

@alamb
Copy link
Contributor

alamb commented Jan 28, 2025

I plan to review this PR later today or tomorrow as it is on my "45 blockers" list

Thank you for your patience @xudong963

@xudong963
Copy link
Member Author

thanks @berkaysynnada @alamb

@alamb
Copy link
Contributor

alamb commented Jan 29, 2025

I merged up from main to resolve a conflict on this PR as part of my review

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @xudong963 -- it is quite impressive that you got this implementation

What I don't understand is how this fix related to running EnforceDistribution twice. I understand the bug surfaces when EnforceDistribution is run twice, but it seems like the fix is to avoid optimizing away SPMs that had a limit on it.

It seems like think there is something wrong with EnforceDistribution itself, and the bug happens to manifest itself when EnforceDistribution is run twice.

I left some possible ideas for cleaning up the code, but what I think is really important is to get some unit-test style tests showing what this fix is doing.

Does this make sense?

) -> Result<(DistributionContext, Option<usize>)> {
let mut children = vec![];
let mut fetch = None;
for child in context.children.into_iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the DistributionContext is already passed through most of the functions in this code, I wonder if you considiered adding a fetch field, like

struct DistributionContext {
  ...
  /// Limit which must be applied to any sort preserving merge that is created
  fetch: Option<usize>
}

🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, will give a try

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 1390 to 1406
plan = Arc::new(
SortPreservingMergeExec::new(
plan.output_ordering()
.unwrap_or(&LexOrdering::default())
.clone(),
plan,
)
.with_fetch(fetch.take()),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this add back a SortPreservingMerge without sort exprs? Wouldn't it be better to use a GlobalLimitExec or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If use the GlobalLimitExec, we also need to maintain the skip, I think it will make code complexity(Maybe need to add the skip() method to ExecutionPlan), so I directly follow the comment of remove_dist_changing_operators: If they are necessary, they will be added in subsequent stages..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then the SortPreservingMerge may be optimized/removed.

|_, _| (),
)?;

let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am somewhat worried about this test being brittle -- it seems like it requires a very specific sequence of optimizer passes that are required. And I worry that if the default sequences of passes is changed then this test might no longer cover the issues

I actually tried to reproduce the results by just adding EnforceDistribution at the end of the default list of optimizers and the issue did not manifest itself 🤔

    let planner = DefaultPhysicalPlanner::default();
    let session_state = SessionStateBuilder::new()
        .with_config(ctx.copied_config())
        .with_default_features()
        .with_physical_optimizer_rule(Arc::new(EnforceDistribution::new()))// -- Add enforce distribution rule again
        .build();
    let optimized_physical_plan = planner
        .create_physical_plan(&optimized_logical_plan, &session_state)
        .await?;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second EnforceDistribution should be run before removing OutputRequirements.

You can try this one:

let planner = DefaultPhysicalPlanner::default();
    let session_state = SessionStateBuilder::new()
        .with_config(ctx.copied_config())
        .with_default_features()
        .with_physical_optimizer_rule(Arc::new(OutputRequirements::new_add_mode()))
        .with_physical_optimizer_rule(Arc::new(EnforceDistribution::new()))
        .with_physical_optimizer_rule(Arc::new(OutputRequirements::new_remove_mode())) // -- Add enforce distribution rule again
        .build();
    let optimized_physical_plan = planner
        .create_physical_plan(&optimized_logical_plan, &session_state)
        .await?;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about this: cf04fd7

@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn apply_enforce_distribution_multiple_times() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is nice to have this "end to end" style test, but given the amount of code changed I think it is important to have more "unit style" tests otherwise it is hard to understand how general this fix is (or if it just works for the specified query)

I wonder if you could construct some cases using the same framework as the tests above? Aka make a plan and then run EnforceDistribution twice on it and ensure the plans are ok?

Or perhaps you can update the assert_optimized! to ensure that running EnforceDistribution twice doesn't change the plan again

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides this test, a data test in .slt's would still be helpful IMO

@xudong963
Copy link
Member Author

Thanks, @alamb, I'm on vacation, will reply asap

@@ -986,18 +993,24 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
/// ```
fn remove_dist_changing_operators(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root of the bug is that if there is a SortPreservingMergeExec operator and it has the fetch, the method will remove it directly, and miss the fetch.

IIUC, SortPreservingMergeExec only will be added after EnforceSorting, so if we want to reproduce the bug, EnforceDistribution needs to run after EnforceSorting, the default physical optimizer has the order of rules: EnforceDistribution -> EnforceSorting, so it won't trigger the bug(fetch is missed), but if we run EnforceDistribution again after EnforceSorting, the bug will surface.

What the PR is doing is that it will remain the fetch of SortPreservingMergeExec and add it back if necessary, as the comments of the method said: If they are necessary, they will be added in subsequent stages.

Comment on lines 1390 to 1406
plan = Arc::new(
SortPreservingMergeExec::new(
plan.output_ordering()
.unwrap_or(&LexOrdering::default())
.clone(),
plan,
)
.with_fetch(fetch.take()),
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If use the GlobalLimitExec, we also need to maintain the skip, I think it will make code complexity(Maybe need to add the skip() method to ExecutionPlan), so I directly follow the comment of remove_dist_changing_operators: If they are necessary, they will be added in subsequent stages..

) -> Result<(DistributionContext, Option<usize>)> {
let mut children = vec![];
let mut fetch = None;
for child in context.children.into_iter() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, will give a try

@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn apply_enforce_distribution_multiple_times() -> Result<()> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

@xudong963
Copy link
Member Author

What I don't understand is how this fix related to running EnforceDistribution twice. I understand the bug surfaces when EnforceDistribution is run twice, but it seems like the fix is to avoid optimizing away SPMs that had a limit on it.

It seems like think there is something wrong with EnforceDistribution itself, and the bug happens to manifest itself when EnforceDistribution is run twice.

Thanks for your review @alamb ,I left some replies in the thread: #14207 (comment)

@alamb
Copy link
Contributor

alamb commented Feb 10, 2025

I wonder if this PR is related to

@alamb alamb mentioned this pull request Feb 16, 2025
@berkaysynnada
Copy link
Contributor

@xudong963 the bug still lives?

@xudong963
Copy link
Member Author

xudong963 commented Feb 18, 2025

Yes, the bug still lives.

The root of the bug is that if there is a SortPreservingMergeExec operator and it has the fetch, the method(remove_dist_changing_operators ) will remove it directly, and miss the fetch. So it's definitely a bug.

IIUC, SortPreservingMergeExec only will be added after EnforceSorting, so if we want to reproduce the bug, EnforceDistribution needs to run after EnforceSorting, the default physical optimizer has the order of rules: EnforceDistribution -> EnforceSorting, so it won't trigger the bug(fetch is missed), but if we run EnforceDistribution again after EnforceSorting, the bug will surface.

What the PR is doing is that it will remain the fetch of SortPreservingMergeExec and add it back if necessary, as the comments of the method said: If they are necessary, they will be added in subsequent stages.

@xudong963 xudong963 force-pushed the fix_enforce_distribution branch from e6d833d to 4d24c5b Compare February 18, 2025 06:24
@xudong963
Copy link
Member Author

I'm still resolving the comments from @alamb , I'll ping you @alamb @berkaysynnada to review after I'm ready!

@xudong963 xudong963 marked this pull request as draft February 18, 2025 09:39
@xudong963 xudong963 marked this pull request as ready for review February 18, 2025 14:44
@xudong963
Copy link
Member Author

xudong963 commented Feb 18, 2025

cc @wiedld , maybe you are interested in the PR. And cc @alamb @berkaysynnada , the PR is ready to review.

@xudong963 xudong963 mentioned this pull request Feb 15, 2025
26 tasks
@alamb
Copy link
Contributor

alamb commented Feb 23, 2025

I plan to review this one carefully tomorrow

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @xudong963 for both identifying and fixing this subtle bug. I have a few suggestions to help prevent similar issues with other operators in the future.

Unfortunately, idempotency problems still exist in the physical planner, particularly in output requirements & enforce sorting pair, and enforce distribution (based on my previous observations). We need to push along these deficiencies further.

Arc::new(OutputRequirements::new_remove_mode()),
Arc::new(ProjectionPushdown::new()),
Arc::new(LimitPushdown::new()),
Arc::new(SanityCheckPlan::new()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/datafusion/pull/14207/files#r1959171412
If that's the case, why don't we just pass through the minimal reproducer rules?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ffb1eb3 done

@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn apply_enforce_distribution_multiple_times() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides this test, a data test in .slt's would still be helpful IMO

while is_repartition(&distribution_context.plan)
|| is_coalesce_partitions(&distribution_context.plan)
|| is_sort_preserving_merge(&distribution_context.plan)
{
if is_sort_preserving_merge(&distribution_context.plan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we limit the fetch extraction with only sort preserving merge operator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, CoalescePartitionsExec also has the fetch. I recall it was added recently, I'll record an issue to add it later

I think we should also make a test that can verify if miss fetch in CoalescePartitionsExec there will be something wrong, the test may be difficult to reproduce so let's do it in a separate PR.

let child_plan = Arc::clone(&context.children[0].plan);
context.plan = Arc::new(CoalescePartitionsExec::new(child_plan));
context.data.fetch = fetch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to set the fetch for all operators if they have a fetch ? If it is so, let's do that before these if else blocks -- that will also eliminate the line 1078

let mut optimized_distribution_ctx =
DistributionContext::new(Arc::clone(&plan), data.clone(), children);

// If `fetch` was not consumed, it means that there was `SortPreservingMergeExec` with fetch before
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure the fetch belongs to sort preserving merge? Even if it is so at the moment, what will happen if there emerge more operators having fetch capability?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, do you have any suggestions? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @alamb here, I think @berkaysynnada 's comment makes a lot of sense, but I don't have a good way to solve it currently, any suggestions/ideas will be appreciated ❤️

@xudong963
Copy link
Member Author

Besides this test, a data test in .slt's would still be helpful IMO

Yes, but I don't find such a sqllogictest to reproduce this bug without changing the default optimizer rule

@alamb alamb mentioned this pull request Feb 24, 2025
10 tasks
@alamb
Copy link
Contributor

alamb commented Feb 24, 2025

Besides this test, a data test in .slt's would still be helpful IMO

Yes, but I don't find such a sqllogictest to reproduce this bug without changing the default optimizer rule

I plan to review this PR and the surrounding code more carefully later today. I don't understand it well enough to offer useful suggestions at this time

@xudong963 xudong963 force-pushed the fix_enforce_distribution branch from 9b7a41c to ffb1eb3 Compare February 25, 2025 03:02
Comment on lines 1007 to +1011
while is_repartition(&distribution_context.plan)
|| is_coalesce_partitions(&distribution_context.plan)
|| is_sort_preserving_merge(&distribution_context.plan)
{
if is_sort_preserving_merge(&distribution_context.plan) {
Copy link
Member Author

@xudong963 xudong963 Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a more straightforward/"violent" way to fix the bug, we don't remove the operator if it has fetch. This will definitely guarantee correctness. cc @berkaysynnada

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guarantees correctness but sacrifices plan simplicity? Can we see the idea and its outcomes?

Copy link
Member Author

@xudong963 xudong963 Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it'll sacrifice plan simplicity, but correctness is the base.

Another thought is to record what operators(with fetch) are removed and add them back at the end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I have also not a clear answer atm, I need to review and remember the details. Even if we cannot generalize the rule now, one way to proceed is maybe we can update the plans which having fetch currently (like spm and coalescePartitions) manually. But, perhaps a more comprehensive fix is what we need to fully resolve the issue

Copy link
Contributor

@berkaysynnada berkaysynnada Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it'll sacrifice plan simplicity, but correctness is the base.

There is a correctness issue we have now ?(unless you change the physical optimizer list) -- Of course, we always aim having idempotent and orthogonal rules, but I think to provide those, we shouldn't break the existing planning results under the default and optimal rule set.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a correctness issue we have now ?(unless you change the physical optimizer list)

Yes, the correctness only will surface if changes the physical optimizer list, such as adding the extra EnforceDistribution. Because we allow the users to custom their physical optimizer, so it's risky.

It's not urgent, we can keep the current state until we find a better way to fix it.

@xudong963 xudong963 marked this pull request as draft February 26, 2025 11:35
@xudong963 xudong963 closed this Apr 25, 2025
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
core Core DataFusion crate optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Bug: applying multiple times EnforceDistribution generates invalid plan
3 participants