Skip to content

Commit

Permalink
[SNAP-1083] fix numBuckets handling (#15)
Browse files Browse the repository at this point in the history
- don't apply numBuckets in Shuffle partitioning since Shuffle cannot create
  a compatible partitioning with matching numBuckets (only numPartitions)
- check numBuckets too in HashPartitioning compatibility
  • Loading branch information
Sumedh Wale authored Oct 17, 2016
1 parent 4ecd5b4 commit 0ea0048
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ case class OrderlessHashPartitioning(expressions: Seq[Expression],
* in the same partition.
*/
case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int,
numBuckets : Int = 0 ) extends Expression with Partitioning with Unevaluable {
numBuckets: Int = 0) extends Expression with Partitioning with Unevaluable {

override def children: Seq[Expression] = expressions
override def nullable: Boolean = false
Expand All @@ -311,12 +311,14 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int,
}

override def compatibleWith(other: Partitioning): Boolean = other match {
case o: HashPartitioning => this.semanticEquals(o)
case o: HashPartitioning =>
this.numBuckets == o.numBuckets && this.semanticEquals(o)
case _ => false
}

override def guarantees(other: Partitioning): Boolean = other match {
case o: HashPartitioning => this.semanticEquals(o)
case o: HashPartitioning =>
this.numBuckets == o.numBuckets && this.semanticEquals(o)
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,20 +216,10 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
// number of partitions. Otherwise, we use maxChildrenNumPartitions.
if (shufflesAllChildren) defaultNumPreShufflePartitions else maxChildrenNumPartitions
}
val numBuckets = {
children.map(child => {
if (child.outputPartitioning.isInstanceOf[OrderlessHashPartitioning]) {
child.outputPartitioning.asInstanceOf[OrderlessHashPartitioning].numBuckets
}
else {
0
}
}).reduceLeft(_ max _)
}
children.zip(requiredChildDistributions).map {
case (child, distribution) =>
val targetPartitioning = createPartitioning(distribution,
numPartitions, numBuckets)
numPartitions)
if (child.outputPartitioning.guarantees(targetPartitioning)) {
child
} else {
Expand Down

0 comments on commit 0ea0048

Please # to comment.