-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[SPARK-32820][SQL] Remove redundant shuffle exchanges inserted by EnsureRequirements #29677
Changes from 2 commits
840ff97
1effe75
0da9e82
1e3216e
5680d48
ebdec14
23c8b70
e699cb6
d6e8cbe
95454a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,7 +52,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { | |
case (child, distribution) => | ||
val numPartitions = distribution.requiredNumPartitions | ||
.getOrElse(conf.numShufflePartitions) | ||
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) | ||
val newChild = if (child.isInstanceOf[ShuffleExchangeExec]) child.children.head else child | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reminds me of #26946. cc @cloud-fan, @maryannxue and @stczwd FYI There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To avoid the case @HyukjinKwon pointed out above, it seems we need to check if Btw, in my opinion, to avoid complicating the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Do you mean we should check whether Just removing the existing
Having a new rule sounds better. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I didn't mean so; have you checked #26946? This PR currently removes shuffles incrrectly;
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have considered such a case but if a shuffle/partitioning performs after its immediate child of another shuffle/partitioning, is the child shuffle/partitioning meaningful? I checked #26946 and I understand that the root cause of that issue was the bug about how to handle hints in the parser so the approach which uses optimization was wrong and incomplete for that issue. The solutions are similar between this PR and that PR but the issue is different. I might misunderstand about what you point out and that PR, please correct me if my understanding is wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I got your point and it makes sense. Coud you update the PR description? The current one only describes the same output partioning cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I've updated. Thanks. |
||
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), newChild) | ||
} | ||
|
||
// Get the indexes of children which have specified distribution requirements and need to have | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some comments about why we can remove it?