Skip to content

Commit

Permalink
Semaphore optimization in scan
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman committed Apr 26, 2024
1 parent e0104bb commit 71c8166
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,8 @@ abstract class MultiFileCloudPartitionReaderBase(
return true
}

// Read starts with IO operations, so leaving GPU for a while.
GpuSemaphore.releaseIfNecessary(TaskContext.get())
// Temporary until we get more to read
batchIter = EmptyGpuColumnarBatchIterator
// if we have batch left from the last file read return it
Expand Down Expand Up @@ -1031,6 +1033,9 @@ abstract class MultiFileCoalescingPartitionReaderBase(
def startNewBufferRetry: Unit = ()

private def readBatch(): Iterator[ColumnarBatch] = {
val taskContext = TaskContext.get()
// Read begins with IO operations, so leaving GPU for a while.
GpuSemaphore.releaseIfNecessary(taskContext)
withResource(new NvtxRange(s"$getFileFormatShortName readBatch", NvtxColor.GREEN)) { _ =>
val currentChunkMeta = populateCurrentBlockChunk()
val batchIter = if (currentChunkMeta.clippedSchema.isEmpty) {
Expand All @@ -1040,7 +1045,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(
} else {
val rows = currentChunkMeta.numTotalRows.toInt
// Someone is going to process this data, even if it is just a row count
GpuSemaphore.acquireIfNecessary(TaskContext.get())
GpuSemaphore.acquireIfNecessary(taskContext)
val nullColumns = currentChunkMeta.readSchema.safeMap(f =>
GpuColumnVector.fromNull(rows, f.dataType).asInstanceOf[SparkVector])
val emptyBatch = new ColumnarBatch(nullColumns.toArray, rows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2783,6 +2783,9 @@ class ParquetPartitionReader(
}

private def readBatches(): Iterator[ColumnarBatch] = {
val taskContext = TaskContext.get()
// Read starts with IO operations, so leaving GPU for a while.
GpuSemaphore.releaseIfNecessary(taskContext)
withResource(new NvtxRange("Parquet readBatch", NvtxColor.GREEN)) { _ =>
val currentChunkedBlocks = populateCurrentBlockChunk(blockIterator,
maxReadBatchSizeRows, maxReadBatchSizeBytes, readDataSchema)
Expand All @@ -2793,7 +2796,7 @@ class ParquetPartitionReader(
EmptyGpuColumnarBatchIterator
} else {
// Someone is going to process this data, even if it is just a row count
GpuSemaphore.acquireIfNecessary(TaskContext.get())
GpuSemaphore.acquireIfNecessary(taskContext)
val nullColumns = readDataSchema.safeMap(f =>
GpuColumnVector.fromNull(numRows, f.dataType).asInstanceOf[SparkVector])
new SingleGpuColumnarBatchIterator(new ColumnarBatch(nullColumns.toArray, numRows))
Expand All @@ -2812,7 +2815,7 @@ class ParquetPartitionReader(
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
// about to start using the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())
GpuSemaphore.acquireIfNecessary(taskContext)

RmmRapidsRetryIterator.withRetryNoSplit(dataBuffer) { _ =>
// Inc the ref count because MakeParquetTableProducer will try to close the dataBuffer
Expand Down

0 comments on commit 71c8166

Please # to comment.