From 0f78bbabfdf4b01fcc15d67240751a625fad12ff Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Tue, 11 Oct 2016 18:51:52 +0530 Subject: [PATCH] [SNAP-1083] fix numBuckets handling - don't apply numBuckets in Shuffle partitioning since Shuffle cannot create a compatible partitioning with matching numBuckets (only numPartitions) - check numBuckets too in HashPartitioning compatibility --- .../sql/catalyst/plans/physical/partitioning.scala | 8 +++++--- .../sql/execution/exchange/EnsureRequirements.scala | 12 +----------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 48f6edcf4ef2a..6bc140aa9aefe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -297,7 +297,7 @@ case class OrderlessHashPartitioning(expressions: Seq[Expression], * in the same partition. */ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int, - numBuckets : Int = 0 ) extends Expression with Partitioning with Unevaluable { + numBuckets: Int = 0) extends Expression with Partitioning with Unevaluable { override def children: Seq[Expression] = expressions override def nullable: Boolean = false @@ -311,12 +311,14 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int, } override def compatibleWith(other: Partitioning): Boolean = other match { - case o: HashPartitioning => this.semanticEquals(o) + case o: HashPartitioning => + this.numBuckets == o.numBuckets && this.semanticEquals(o) case _ => false } override def guarantees(other: Partitioning): Boolean = other match { - case o: HashPartitioning => this.semanticEquals(o) + case o: HashPartitioning => + this.numBuckets == o.numBuckets && this.semanticEquals(o) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index b33dc40615351..449769821e303 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -216,20 +216,10 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { // number of partitions. Otherwise, we use maxChildrenNumPartitions. if (shufflesAllChildren) defaultNumPreShufflePartitions else maxChildrenNumPartitions } - val numBuckets = { - children.map(child => { - if (child.outputPartitioning.isInstanceOf[OrderlessHashPartitioning]) { - child.outputPartitioning.asInstanceOf[OrderlessHashPartitioning].numBuckets - } - else { - 0 - } - }).reduceLeft(_ max _) - } children.zip(requiredChildDistributions).map { case (child, distribution) => val targetPartitioning = createPartitioning(distribution, - numPartitions, numBuckets) + numPartitions) if (child.outputPartitioning.guarantees(targetPartitioning)) { child } else {