-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[SPARK-32820][SQL] Remove redundant shuffle exchanges inserted by EnsureRequirements #29677
Conversation
@@ -52,7 +52,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { | |||
case (child, distribution) => | |||
val numPartitions = distribution.requiredNumPartitions | |||
.getOrElse(conf.numShufflePartitions) | |||
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) | |||
val newChild = if (child.isInstanceOf[ShuffleExchangeExec]) child.children.head else child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some comments about why we can remove it?
Test build #128397 has finished for PR 29677 at commit
|
Test build #128407 has finished for PR 29677 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sarutak for making this change. I have a question whether this optimization should be done on user side or on system side.
EnsureRequirements
will add shuffle/sort when it's necessary to add, but will not remove the shuffle/sort added explicitly by users (DISTRIBUTE BY/SORT BY in SQL, repartitionByRange/orderBy in dataframe, etc). Users can choose to remove these repartitionByRange/orderBy in query by themselves to save the shuffle/sort, as they are not necessary to add. E.g. we can have more complicated case if user don't do the right thing: spark.range(1, 100).repartitionByRange(10, $"id".desc).repartitionByRange(10, $"id").orderBy($"id")
, should we also handle these cases?
I vaguely remember removing the redundant shuffle exchange/sort explicitly added by users in query, is a won't fix. But I cannot find the old PR now, cc @cloud-fan . Thanks.
@c21 Thanks for the comment.
Yes, user can choose it but it requires users to understand how Spark and Spark SQL work internally and some distributed computing knowledge beforehand.
Actually, this case is already handled by |
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
Outdated
Show resolved
Hide resolved
Test build #128550 has finished for PR 29677 at commit
|
retest this please. |
Test build #128557 has finished for PR 29677 at commit
|
@c21 @imback82 @maropu @HyukjinKwon |
retest this please |
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) | ||
// Like optimizer.CollapseRepartition removes adjacent repartition operations, | ||
// adjacent repartitions performed by shuffle can be also removed. | ||
val newChild = if (child.isInstanceOf[ShuffleExchangeExec]) child.children.head else child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me of #26946. cc @cloud-fan, @maryannxue and @stczwd FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid the case @HyukjinKwon pointed out above, it seems we need to check if outputPartitioning
is the same for narrowing down this optimization scope.
Btw, in my opinion, to avoid complicating the EnsureRequirements
rule more, it would be better to remove these kinds of redundant shuffles in a new rule after EnsureRequirements
like #27096.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems we need to check if outputPartitioning is the same for narrowing down this optimization scope.
Do you mean we should check whether outputPartitioning
of ShuffleExchangeExec
to be inserted and the one of the existing ShuffleExchangeExec
?
If you mean so, it should already match this condition.
Just removing the existing ShuffleExchange
and inserting the new ShuffleExchange
whose outputPartitioning
satisfies the required Distribution
works, doesn't it?
Btw, in my opinion, to avoid complicating the EnsureRequirements rule more, it would be better to remove these kinds of redundant shuffles in a new rule after EnsureRequirements like #27096.
Having a new rule sounds better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean we should check whether outputPartitioning of ShuffleExchangeExec to be inserted and the one of the existing ShuffleExchangeExec?
I didn't mean so; have you checked #26946? This PR currently removes shuffles incrrectly;
scala> spark.range(1).selectExpr("id as a").write.saveAsTable("test")
scala> sql("SELECT /*+ REPARTITION(5) */ * FROM test ORDER BY a").explain()
== Physical Plan ==
*(2) Sort [a#5L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#5L ASC NULLS FIRST, 200), true, [id=#53]
+- Exchange RoundRobinPartitioning(5), false, [id=#52] <--- !!! Removed? !!!
+- *(1) ColumnarToRow
+- FileScan parquet default.test[a#5L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/test], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have considered such a case but if a shuffle/partitioning performs after its immediate child of another shuffle/partitioning, is the child shuffle/partitioning meaningful?
In the example above, the result is not different regardless of RoundRobinPartitioning
removed or not right?
I checked #26946 and I understand that the root cause of that issue was the bug about how to handle hints in the parser so the approach which uses optimization was wrong and incomplete for that issue.
The solutions are similar between this PR and that PR but the issue is different.
This PR just focuses on removing redundant shuffles.
I might misunderstand about what you point out and that PR, please correct me if my understanding is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I got your point and it makes sense. Coud you update the PR description? The current one only describes the same output partioning cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I've updated. Thanks.
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #129171 has finished for PR 29677 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #129298 has finished for PR 29677 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
retest this please. |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #129396 has finished for PR 29677 at commit
|
case class PruneShuffle() extends Rule[SparkPlan] { | ||
|
||
override def apply(plan: SparkPlan): SparkPlan = plan.transform { | ||
case op @ ShuffleExchangeExec(_, child: ShuffleExchangeExec, _) => | ||
op.withNewChildren(Seq(pruneShuffle(child))) | ||
case other => other | ||
} | ||
|
||
private def pruneShuffle(plan: SparkPlan): SparkPlan = { | ||
plan match { | ||
case shuffle: ShuffleExchangeExec => | ||
pruneShuffle(shuffle.child) | ||
case other => other | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd propose a more concise way:
case class PruneShuffle() extends Rule[SparkPlan] { | |
override def apply(plan: SparkPlan): SparkPlan = plan.transform { | |
case op @ ShuffleExchangeExec(_, child: ShuffleExchangeExec, _) => | |
op.withNewChildren(Seq(pruneShuffle(child))) | |
case other => other | |
} | |
private def pruneShuffle(plan: SparkPlan): SparkPlan = { | |
plan match { | |
case shuffle: ShuffleExchangeExec => | |
pruneShuffle(shuffle.child) | |
case other => other | |
} | |
} | |
} | |
case class PruneShuffle() extends Rule[SparkPlan] { | |
override def apply(plan: SparkPlan): SparkPlan = plan transformUp { | |
case op @ ShuffleExchangeExec(_, ShuffleExchangeExec(_, grandchild, _), _) => | |
op.withNewChildren(grandchild :: Nil) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, in the current implementation, at most two ShuffleExchangeExec
can be consecutive. But if it is more consecutive in the future, transformUp
is not efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH performance wise we are talking about millisecond level optimization here. I would value readability over micro optimizations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to what is discussed here, I'd like to avoid unnecessary transformation.
Test build #129397 has finished for PR 29677 at commit
|
cc: @cloud-fan |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #130347 has finished for PR 29677 at commit
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This PR changes
EnsureRequirements
to let it remove redundantShuffleExchange
.Normally, redundant repartition operations are removed by
CollapseRepartition
rule butEnsureRequirements
can insert anotherHashPartitioning
orRangePartitioning
immediately after the repartition, leading adjacentShuffleExchange
will be in the physical plan.Even if their
outputPartitioning
are different, those adjacentShuffleExcnahge
are redundant.An example.
In this case, the lower
Exchange
forrangepartitioning
is redundant.Another example.
In this case, the lower
Exchange for
RoundRobinPartitioninginserted by
EnsureRequirements` for the left side is not necessary.Why are the changes needed?
To remove unnecessary shuffle.
Does this PR introduce any user-facing change?
Yes. After this change, such redundant
ShuffleExchange
will be removed.How was this patch tested?
New tests.