-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Support DayTimeIntervalType
in ParquetCachedBatchSerializer
[databricks]
#4926
Conversation
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
build |
...gin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
build |
ids=["rapids_memoryscan_on", "rapids_memoryscan_off"]) | ||
@pytest.mark.parametrize('with_rapids_reader', ['true', 'false'], | ||
ids=["rapids_reader_on", "rapids_reader_off"]) | ||
def test_cache_daytimeinterval_input_columnar(spark_tmp_path, alongside_gen, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this test method so complicated? What are we doing in this test that we aren't doing in any other test? We have a DF that we cache and then pull it from the cache, compare the result from the CPU and GPU
In other words, why don't we just do the following?
def test_cache_daytimeinterval_input_columnar():
def func(spark):
df = two_col_df(spark, DayTimeIntervalGen(), alongside_gen)
df.cache().count()
return df.selectExpr("a") // or whatever the column name is
assert_gpu_and_cpu_are_equal_collect(func, conf={YOUR CONF})
I could be missing something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought since we implement all the paths (both CPU and GPU) in the PCBS, I think the output for all of the paths should be equal to the original data.
Comparing the GPU output to CPU output bases on the CPU output of the PCBS is reliable and equal to the Spark output.
Maybe it is over designed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading data from parquet and writing it back is to get a columnar input to be cached and then convert the cached batches to columnar batches.
I do the same as your suggestion in the test_cache_daytimeinterval_input_row
test, it only checks the paths of conversion between internal rows and cached batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified this test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading/writing to parquet isn't necessary to write/read columnar cache.
If you look at InMemoryTableScanExec
it calls the convertColumnarBatchToCachedBatch
if the spark plan supports columnar input and if the serializer supports columnar input (which it always does here)
Reading cache columnar in PCBS depends on three variables, whether the conf spark.sql.inMemoryColumnarStorage.enableVectorizedReader
is enabled and the plan has 100 or less columns and the plan's output is AtomicType or NullType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know. Just got home and updated it.
Merged the two tests into one and updated the test function as your suggestion above.
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
build |
// will be reused, and Spark expects the producer to close its batches. | ||
val numRows = batch.numRows() | ||
val gcbBuilder = new GpuColumnarBatchBuilder(structSchema, numRows) | ||
for (i <- 0 until batch.numCols()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use the following to improve the performance:
var rowIndex = 0
while (rowIndex < batch.numRows()) {
......
rowIndex += 1
}
A similar PR: #4770
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Number of rows is always quite large, so this change can improve some performance. However here is for columns, this suggestion will get little benfit for performance, since number of columns is usually small.
@razajafri Could you review this again ? |
build |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
6ccf551
to
d2b8a9c
Compare
build |
DayTimeIntervalType
in ParquetCachedBatchSerializer
DayTimeIntervalType
in ParquetCachedBatchSerializer
[databricks]
This PR is to add the new
DayTimeIntervalType
support inParquetCachedBatchSerializer
for Spark v3.3.0+, along with some tests.closes #4148
closes #4931
Signed-off-by: Firestarman firestarmanllc@gmail.com