Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Aug 10, 2016
1 parent 1348ba7 commit 8444447
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* Finds scalar subquery expressions in this plan node and starts evaluating them.
*/
protected def prepareSubqueries(): Unit = {
val allSubqueries = expressions.flatMap(_.collect { case e: ExecSubqueryExpression => e })
allSubqueries.foreach {
case e: ExecSubqueryExpression =>
e.plan.prepare()
runningSubqueries += e
expressions.foreach {
_.collect {
case e: ExecSubqueryExpression =>
e.plan.prepare()
runningSubqueries += e
}
}
}

Expand All @@ -162,7 +163,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
protected def waitForSubqueries(): Unit = synchronized {
// fill in the result of subqueries
runningSubqueries.foreach { sub =>
sub.updateResult(sub.plan.executeCollect())
sub.updateResult()
}
runningSubqueries.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ trait ExecSubqueryExpression extends SubqueryExpression {
override def plan: SparkPlan = executedPlan

/**
* Fill the expression with result from subquery.
* Fill the expression with collected result from executed plan.
*/
def updateResult(rows: Array[InternalRow]): Unit
def updateResult(): Unit
}

/**
Expand Down Expand Up @@ -76,7 +76,8 @@ case class ScalarSubquery(
@volatile private var result: Any = null
@volatile private var updated: Boolean = false

def updateResult(rows: Array[InternalRow]): Unit = {
def updateResult(): Unit = {
val rows = plan.executeCollect()
if (rows.length > 1) {
sys.error(s"more than one row returned by a subquery used as an expression:\n${plan}")
}
Expand Down Expand Up @@ -125,7 +126,8 @@ case class InSubquery(
case _ => false
}

def updateResult(rows: Array[InternalRow]): Unit = {
def updateResult(): Unit = {
val rows = plan.executeCollect()
result = rows.map(_.get(0, child.dataType)).asInstanceOf[Array[Any]]
updated = true
}
Expand Down

0 comments on commit 8444447

Please # to comment.