You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The nonblocking API will be very useful to determine if it is good time to offload some work to the host side.
For instance, we can decode (and decompress) Parquet file buffers on the host side if there is no available GPU resource at that moment .(I am working on the PoC of this feature.) The entrance of demo implementation looks like :
valhostSideRead=if (enableHostSideRead) {
!GpuSemaphore.tryAcquireIfNecessary(TaskContext.get())
} else {
GpuSemaphore.acquireIfNecessary(TaskContext.get())
false
}
RmmRapidsRetryIterator.withRetry(hostBuffer, splitBatchSizePolicy) { _ =>// The MakeParquetTableProducer will close the input buffer, and that would be bad// because we don't want to close it until we know that we are done with it
hostBuffer.incRefCount()
valtableReader=if (hostSideRead) {
newVectorizedParquetGpuProducer(conf, currentTargetBatchSize.toInt,
hostBuffer, 0, dataSize, metrics,
dateRebaseMode, timestampRebaseMode, hasInt96Timestamps,
clippedSchema, readDataSchema)
} else {
MakeParquetTableProducer(useChunkedReader, conf, currentTargetBatchSize,
parseOpts,
hostBuffer, 0, dataSize, metrics,
dateRebaseMode, timestampRebaseMode, hasInt96Timestamps,
isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files,
debugDumpPrefix, debugDumpAlways)
}
valbatchIter=CachedGpuBatchIterator(tableReader, colTypes)
if (allPartValues.isDefined) {
valallPartInternalRows= allPartValues.get.map(_._2)
valrowsPerPartition= allPartValues.get.map(_._1)
newGpuColumnarBatchWithPartitionValuesIterator(batchIter, allPartInternalRows,
rowsPerPartition, partitionSchema, maxGpuColumnSizeBytes)
} else {
// this is a bit weird, we don't have number of rows when allPartValues isn't// filled in so can't use GpuColumnarBatchWithPartitionValuesIterator
batchIter.flatMap { batch =>// we have to add partition values here for this batch, we already verified that// its not different for all the blocks in this batchBatchWithPartitionDataUtils.addSinglePartitionValueToBatch(batch,
partedFile.partitionValues, partitionSchema, maxGpuColumnSizeBytes)
}
}
}.flatten
Is your feature request related to a problem? Please describe.
It would be great if
GpuSemaphore
supports nonblocking API likeThe nonblocking API will be very useful to determine if it is good time to offload some work to the host side.
For instance, we can decode (and decompress) Parquet file buffers on the host side if there is no available GPU resource at that moment .(I am working on the PoC of this feature.) The entrance of demo implementation looks like :
https://github.com/sperlingxx/spark-rapids/tree/cpu_parquet_decomp
The text was updated successfully, but these errors were encountered: