Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[SPARK-32820][SQL] Remove redundant shuffle exchanges inserted by EnsureRequirements #29677

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(conf.numShufflePartitions)
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
// Like optimizer.CollapseRepartition removes adjacent repartition operations,
// adjacent repartitions performed by shuffle can be also removed.
val newChild = if (child.isInstanceOf[ShuffleExchangeExec]) child.children.head else child
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments about why we can remove it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of #26946. cc @cloud-fan, @maryannxue and @stczwd FYI

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the case @HyukjinKwon pointed out above, it seems we need to check if outputPartitioning is the same for narrowing down this optimization scope.

Btw, in my opinion, to avoid complicating the EnsureRequirements rule more, it would be better to remove these kinds of redundant shuffles in a new rule after EnsureRequirements like #27096.

Copy link
Member Author

@sarutak sarutak Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems we need to check if outputPartitioning is the same for narrowing down this optimization scope.

Do you mean we should check whether outputPartitioning of ShuffleExchangeExec to be inserted and the one of the existing ShuffleExchangeExec?
If you mean so, it should already match this condition.

Just removing the existing ShuffleExchange and inserting the new ShuffleExchange whose outputPartitioning satisfies the required Distribution works, doesn't it?

Btw, in my opinion, to avoid complicating the EnsureRequirements rule more, it would be better to remove these kinds of redundant shuffles in a new rule after EnsureRequirements like #27096.

Having a new rule sounds better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should check whether outputPartitioning of ShuffleExchangeExec to be inserted and the one of the existing ShuffleExchangeExec?

I didn't mean so; have you checked #26946? This PR currently removes shuffles incrrectly;

scala> spark.range(1).selectExpr("id as a").write.saveAsTable("test")
scala> sql("SELECT /*+ REPARTITION(5) */ * FROM test ORDER BY a").explain()
== Physical Plan ==
*(2) Sort [a#5L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#5L ASC NULLS FIRST, 200), true, [id=#53]
   +- Exchange RoundRobinPartitioning(5), false, [id=#52] <--- !!! Removed? !!!
      +- *(1) ColumnarToRow
         +- FileScan parquet default.test[a#5L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/test], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint>

Copy link
Member Author

@sarutak sarutak Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have considered such a case but if a shuffle/partitioning performs after its immediate child of another shuffle/partitioning, is the child shuffle/partitioning meaningful?
In the example above, the result is not different regardless of RoundRobinPartitioning removed or not right?

I checked #26946 and I understand that the root cause of that issue was the bug about how to handle hints in the parser so the approach which uses optimization was wrong and incomplete for that issue.

The solutions are similar between this PR and that PR but the issue is different.
This PR just focuses on removing redundant shuffles.

I might misunderstand about what you point out and that PR, please correct me if my understanding is wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I got your point and it makes sense. Coud you update the PR description? The current one only describes the same output partioning cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've updated. Thanks.

ShuffleExchangeExec(distribution.createPartitioning(numPartitions), newChild)
}

// Get the indexes of children which have specified distribution requirements and need to have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,38 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
val numPartitions = range.rdd.getNumPartitions
assert(numPartitions == 0)
}

test("SPARK-32820: Remove redundant shuffle exchange") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "200") {
val ordered = spark.range(1, 100).repartitionByRange(10, $"id".desc).orderBy($"id")
val orderedPlan = ordered.queryExecution.executedPlan
val exchangesInOrdered =
orderedPlan.collect { case s: ShuffleExchangeExec => s }
assert(exchangesInOrdered.size == 1)

val partitioning = exchangesInOrdered.head.outputPartitioning
assert(partitioning.numPartitions == 200)
assert(partitioning.isInstanceOf[RangePartitioning])

val left = Seq(1, 2, 3).toDF.repartition(10, $"value")
val right = Seq(1, 2, 3).toDF
val joined = left.join(right, left("value") + 1 === right("value"))
val joinedPlan = joined.queryExecution.executedPlan
val exchangesInJoined =
joinedPlan.collect { case s: ShuffleExchangeExec => s }
assert(exchangesInJoined.size == 2)

val leftPartitioning = exchangesInJoined(0).outputPartitioning
assert(leftPartitioning.numPartitions == 200)
assert(leftPartitioning.isInstanceOf[HashPartitioning])

val rightPartitioning = exchangesInJoined(1).outputPartitioning
assert(rightPartitioning.numPartitions == 200)
assert(rightPartitioning.isInstanceOf[HashPartitioning])
}
}
}
}

// Used for unit-testing EnsureRequirements
Expand Down