diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala index caf323ec053..3f3f2803f5c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala @@ -20,7 +20,7 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import com.nvidia.spark.rapids.shims.ShimUnaryExecNode @@ -194,23 +194,14 @@ case class GpuAggregateInPandasExec( } val batchProducer = new BatchProducer( - BatchGroupedIterator(miniIter, miniAttrs, groupingRefs.indices)) - val queue = new BatchQueue(batchProducer, Some(keyConverter)) - val pyInputIter = batchProducer.asIterator.map { case (batch, isForPeek) => - val inputBatch = closeOnExcept(batch) { _ => + BatchGroupedIterator(miniIter, miniAttrs, groupingRefs.indices), Some(keyConverter)) + val pyInputIter = batchProducer.asIterator.map { batch => + withResource(batch) { _ => val pyInputColumns = pyInputRefs.indices.safeMap { idx => batch.column(idx + groupingRefs.size).asInstanceOf[GpuColumnVector].incRefCount() } new ColumnarBatch(pyInputColumns.toArray, batch.numRows()) } - if (isForPeek) { - batch.close() - } else { - // When adding batch to the queue, queue will convert it to a key batch because this - // queue is constructed with the key converter. - queue.add(batch) - } - inputBatch } // Third, sends to Python to execute the aggregate and returns the result. @@ -232,8 +223,8 @@ case class GpuAggregateInPandasExec( val combinedAttrs = gpuGroupingExpressions.map(_.toAttribute) ++ pyOutAttributes val resultRefs = GpuBindReferences.bindGpuReferences(resultExprs, combinedAttrs) // Gets the combined batch for each group and projects for the output. - new CombiningIterator(queue, pyOutputIterator, pyRunner, mNumOutputRows, - mNumOutputBatches).map { combinedBatch => + new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator, pyRunner, + mNumOutputRows, mNumOutputBatches).map { combinedBatch => withResource(combinedBatch) { batch => GpuProjectExec.project(batch, resultRefs) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala index 60b6b3929e1..5e588cae7bd 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala @@ -171,29 +171,56 @@ class RebatchingRoundoffIterator( } /** - * Work with BatchQueue to support BatchQueue's peek operation by pulling - * in a batch from the input iterator on demand. + * A trait provides dedicated APIs for GPU reading batches from python. + * This is also for easy type declarations since it is implemented by an inner class + * of BatchProducer. + */ +trait BatchQueue { + /** Return and remove the first batch in the cache. Caller should close it. */ + def remove(): SpillableColumnarBatch + + /** Get the number of rows in the next batch, without actually getting the batch. */ + def peekBatchNumRows(): Int +} + +/** + * It accepts an iterator as input and will cache the batches when pulling them in from + * the input for later combination with batches coming back from python by the reader. + * It also supports an optional converter to convert input batches and put the converted + * result to the cache queue. This is for GpuAggregateInPandas to build and cache key + * batches. * - * It also supports accessing batches from the input by an iterator. Call - * "asIterator" to get the iterator. This iterator will return a tuple of - * ColumnarBatch and Boolean. And the boolean indicates whether the batch - * is pulled in for peak. + * Call "getBatchQueue" to get the internal cache queue and specify it to the output + * combination iterator. + * To access the batches from input, call "asIterator" to get the output iterator. */ -class BatchProducer(input: Iterator[ColumnarBatch]) extends AutoCloseable { producer => +class BatchProducer( + input: Iterator[ColumnarBatch], + converter: Option[ColumnarBatch => ColumnarBatch] = None +) extends AutoCloseable { producer => Option(TaskContext.get()).foreach(onTaskCompletion(_)(close())) - // Cache for batches pulled in by the "produce" call for the peek operation. - // In fact, there is usually only one batch. But using a queue here is because in + // A queue that holds the pending batches that need to line up with and combined + // with batches coming back from python. + private[this] val batchQueue = new BatchQueueImpl + + /** Get the internal BatchQueue */ + def getBatchQueue: BatchQueue = batchQueue + + // The cache that holds the pending batches pulled in by the "produce" call for + // the reader peeking the next rows number when the "batchQueue" is empty, and + // consumed by the iterator returned from "asIterator". + // (In fact, there is usually only ONE batch. But using a queue here is because in // theory "produce" can be called multiple times, then more than one batch can be - // pulled in. - private val pending = mutable.Queue[SpillableColumnarBatch]() + // pulled in.) + private[this] val pendingOutput = mutable.Queue[SpillableColumnarBatch]() - private[rapids] def produce(): ColumnarBatch = producer.synchronized { + private def produce(): ColumnarBatch = { if (input.hasNext) { val cb = input.next() // Need to duplicate this batch for "next" - pending.enqueue(SpillableColumnarBatch(GpuColumnVector.incRefCounts(cb), + pendingOutput.enqueue(SpillableColumnarBatch(GpuColumnVector.incRefCounts(cb), SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) cb } else { @@ -201,113 +228,94 @@ class BatchProducer(input: Iterator[ColumnarBatch]) extends AutoCloseable { prod } } - def asIterator: Iterator[(ColumnarBatch, Boolean)] = { - new Iterator[(ColumnarBatch, Boolean)] { + /** Return an iterator to access the batches from the input */ + def asIterator: Iterator[ColumnarBatch] = { + new Iterator[ColumnarBatch] { override def hasNext: Boolean = producer.synchronized { - pending.nonEmpty || input.hasNext + pendingOutput.nonEmpty || input.hasNext } - override def next(): (ColumnarBatch, Boolean) = producer.synchronized { + override def next(): ColumnarBatch = producer.synchronized { if (!hasNext) { throw new NoSuchElementException() } - if (pending.nonEmpty) { - withResource(pending.dequeue()) { scb => - (scb.getColumnarBatch(), true) + if (pendingOutput.nonEmpty) { + withResource(pendingOutput.dequeue()) { scb => + scb.getColumnarBatch() } } else { - (input.next(), false) + closeOnExcept(input.next()) { cb => + // Need to duplicate it for later combination with Python output + batchQueue.add(GpuColumnVector.incRefCounts(cb)) + cb + } } } } } - override def close(): Unit = synchronized { - while(pending.nonEmpty) { - pending.dequeue().close() + override def close(): Unit = producer.synchronized { + batchQueue.close() + while (pendingOutput.nonEmpty) { + pendingOutput.dequeue().close() } } -} - -/** - * A simple queue that holds the pending batches that need to line up with - * and combined with batches coming back from python. - * - * It will ask for a batch from "batchProducer" when peeking the rows number - * and the queue is empty. - * It also supports an optional converter to convert the input batch and save - * the converted batch. This is design for the GpuAggregateInPandasExec to save - * the group key instead of the original input batch. - */ -class BatchQueue( - batchProducer: BatchProducer, - converter: Option[ColumnarBatch => ColumnarBatch] = None -) extends AutoCloseable { - - assert(batchProducer != null, "BatchQueue requires a BatchProducer") - Option(TaskContext.get()).foreach(onTaskCompletion(_)(close())) - - private val queue = mutable.ArrayBuffer[SpillableColumnarBatch]() - - private[this] def convertIfAny(batch: ColumnarBatch): ColumnarBatch = { - converter.map { convert => - withResource(batch)(convert) - }.getOrElse(batch) - } - /** Add a batch to the queue, the input batch will be taken over, do not use it anymore */ - def add(batch: ColumnarBatch): Unit = { - val cb = convertIfAny(batch) - this.synchronized { - queue.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + // Put this batch queue inside the BatchProducer to share the same lock with the + // output iterator returned by "asIterator" and make sure the batch movement from + // input iterator to this queue is an atomic operation. + // In a two-threaded Python runner, using two locks to protect the batch pulling + // from the input and the batch queue separately can not ensure batches in the + // queue has the same order as they are pulled in from the input. Because there is + // a race when the reader and the writer append batches to the queue. + // One possible case is: + // 1) the writer thread gets a batch A, but next it pauses. + // 2) then the reader thread gets the next Batch B, and appends it to the queue. + // 3) the writer thread restores and appends batch A to the queue. + // Therefore, batch A and B have the reversed order in the queue now, leading to data + // corruption when doing the combination. + private class BatchQueueImpl extends BatchQueue with AutoCloseable { + private val queue = mutable.Queue[SpillableColumnarBatch]() + + /** Add a batch to the queue, the input batch will be taken over, do not use it anymore */ + private[python] def add(batch: ColumnarBatch): Unit = { + val cb = converter.map { convert => + withResource(batch)(convert) + }.getOrElse(batch) + queue.enqueue(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) } - } - /** Return and remove the first batch in the cache. */ - def remove(): SpillableColumnarBatch = synchronized { - if (queue.isEmpty) { - null - } else { - queue.remove(0) + /** Return and remove the first batch in the cache. Caller should close it */ + override def remove(): SpillableColumnarBatch = producer.synchronized { + if (queue.isEmpty) { + null + } else { + queue.dequeue() + } } - } - /** Get the number of rows in the next batch, without actually getting the batch. */ - def peekBatchNumRows(): Int = { - val isEmpty = this.synchronized { - queue.isEmpty - } - if (isEmpty) { - // Try to ask for the next batch instead of waiting for inserting a - // batch by the python runner's writing. Because the writing may - // happen after this peak in the single threaded python runner, leading - // to a hang. - // Do not call it inside a lock to avoid any dead lock. - val nextBatch = batchProducer.produce() - if (nextBatch != null) { - val cb = convertIfAny(nextBatch) - this.synchronized { - // Since we release the lock for some time, it is possible some batches - // have been added into the queue. Then we need to make sure this batch - // is the first one. - queue.insert(0, SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + /** Get the number of rows in the next batch, without actually getting the batch. */ + override def peekBatchNumRows(): Int = producer.synchronized { + // Try to pull in the next batch for peek + if (queue.isEmpty) { + val cb = produce() + if (cb != null) { + add(cb) } } - } - this.synchronized { if (queue.nonEmpty) { queue.head.numRows() } else { 0 // Should not go here but just in case. } } - } - override def close(): Unit = synchronized { - while (queue.nonEmpty) { - queue.remove(0).close() + override def close(): Unit = producer.synchronized { + while (queue.nonEmpty) { + queue.dequeue().close() + } } } } @@ -399,19 +407,8 @@ case class GpuArrowEvalPythonExec( val batchProducer = new BatchProducer( new RebatchingRoundoffIterator(iter, inputSchema, targetBatchSize, numInputRows, numInputBatches)) - val queue = new BatchQueue(batchProducer) - val pyInputIterator = batchProducer.asIterator.map { case (batch, isForPeek) => - // We have to do the project before we add the batch because the batch might be closed - // when it is added - val ret = closeOnExcept(batch)(GpuProjectExec.project(_, boundReferences)) - if (isForPeek) { - batch.close() - } else { - // We only add the batch that is not for peek, because the batch for peek is already - // added by the reader when peeking the next rows number. - queue.add(batch) - } - ret + val pyInputIterator = batchProducer.asIterator.map { batch => + withResource(batch)(GpuProjectExec.project(_, boundReferences)) } if (isPythonOnGpuEnabled) { @@ -431,7 +428,7 @@ case class GpuArrowEvalPythonExec( pythonOutputSchema) val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context) - new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows, + new CombiningIterator(batchProducer.getBatchQueue, outputIterator, pyRunner, numOutputRows, numOutputBatches) } else { // Empty partition, return it directly diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala index 12e2258aaaf..ab56a0b24b5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf import ai.rapids.cudf.{GroupByAggregation, NullPolicy, OrderByArg} import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.python.PythonWorkerSemaphore @@ -505,24 +505,13 @@ trait GpuWindowInPandasExecBase extends ShimUnaryExecNode with GpuPythonExecBase val boundPartitionRefs = GpuBindReferences.bindGpuReferences(gpuPartitionSpec, childOutput) val batchProducer = new BatchProducer( new GroupingIterator(inputIter, boundPartitionRefs, numInputRows, numInputBatches)) - val queue = new BatchQueue(batchProducer) - val pyInputIterator = batchProducer.asIterator.map { case (batch, isForPeek) => - // We have to do the project before we add the batch because the batch might be closed - // when it is added - val inputBatch = closeOnExcept(batch) { _ => + val pyInputIterator = batchProducer.asIterator.map { batch => + withResource(batch) { _ => withResource(GpuProjectExec.project(batch, boundDataRefs)) { projectedCb => // Compute the window bounds and insert to the head of each row for one batch insertWindowBounds(projectedCb) } } - if (isForPeek) { - batch.close() - } else { - // We only add the batch that is not for peek, because the batch for peek is already - // added by the reader when peeking the next rows number. - queue.add(batch) - } - inputBatch } if (isPythonOnGpuEnabled) { @@ -543,8 +532,8 @@ trait GpuWindowInPandasExecBase extends ShimUnaryExecNode with GpuPythonExecBase pythonOutputSchema) val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context) - new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows, - numOutputBatches).map(projectResult) + new CombiningIterator(batchProducer.getBatchQueue, outputIterator, pyRunner, + numOutputRows, numOutputBatches).map(projectResult) } else { // Empty partition, return the input iterator directly inputIter diff --git a/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala b/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala index 3d0d3450320..988cbe2521c 100644 --- a/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala +++ b/sql-plugin/src/main/spark321db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala @@ -25,7 +25,7 @@ package com.nvidia.spark.rapids.shims import scala.collection.mutable.ArrayBuffer import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import org.apache.spark.TaskContext @@ -33,7 +33,7 @@ import org.apache.spark.api.python.PythonEvalType import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.rapids.execution.python.{BatchProducer, BatchQueue, CombiningIterator, GpuPythonHelper, GpuWindowInPandasExecBase, GroupingIterator} +import org.apache.spark.sql.rapids.execution.python.{BatchProducer, CombiningIterator, GpuPythonHelper, GpuWindowInPandasExecBase, GroupingIterator} import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} @@ -199,24 +199,13 @@ case class GpuWindowInPandasExec( val boundPartitionRefs = GpuBindReferences.bindGpuReferences(gpuPartitionSpec, childOutput) val batchProducer = new BatchProducer( new GroupingIterator(inputIter, boundPartitionRefs, numInputRows, numInputBatches)) - val queue = new BatchQueue(batchProducer) - val pyInputIterator = batchProducer.asIterator.map { case (batch, isForPeek) => - // We have to do the project before we add the batch because the batch might be closed - // when it is added - val inputBatch = closeOnExcept(batch) { _ => + val pyInputIterator = batchProducer.asIterator.map { batch => + withResource(batch) { _ => withResource(GpuProjectExec.project(batch, boundDataRefs)) { projectedCb => // Compute the window bounds and insert to the head of each row for one batch insertWindowBounds(projectedCb) } } - if (isForPeek) { - batch.close() - } else { - // We only add the batch that is not for peek, because the batch for peek is already - // added by the reader when peeking the next rows number. - queue.add(batch) - } - inputBatch } if (isPythonOnGpuEnabled) { @@ -237,8 +226,8 @@ case class GpuWindowInPandasExec( pythonOutputSchema) val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context) - new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows, - numOutputBatches).map(projectResult) + new CombiningIterator(batchProducer.getBatchQueue, outputIterator, pyRunner, + numOutputRows, numOutputBatches).map(projectResult) } else { // Empty partition, return the input iterator directly inputIter