Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

GPU accelerate Apache Iceberg reads #5941

Merged
merged 45 commits into from
Jul 26, 2022
Merged

Conversation

jlowe
Copy link
Contributor

@jlowe jlowe commented Jul 1, 2022

Closes #4817, closes #5453, and closes #5510.

Adds basic support for GPU acceleration of Apache Iceberg table reads along with a document detailing the limitations of the support and tests. The tests exercise the usual, generic table reading tests, but also test features more specific to Apache Iceberg like time-travel reads, incremental snapshot reads, partitioning schema evolution, row deletion and updates, etc.

Only the Parquet data format is supported in this initial version, and it only provides a per-file strategy to GPU acceleration. Multi-threaded and coalescing reader strategies are planned for the future.

This supports Apache Iceberg 0.13.x, and leverages the Iceberg api and core code provided by whatever Iceberg jar is provided by the user, with the assumption those APIs are relatively stable over time. Related Apache Iceberg code for Parquet and Spark have been adapted for use within the RAPIDS Accelerator, as these interfaces are less likely to remain stable across Apache Iceberg versions. Reflection is used to port over the relevant CPU scan state into an equivalent GPU-accelerated scan.

Metadata queries and processing remains on the CPU, as this involves parsing of relatively tiny JSON files for CPU consumption. The data is read via the existing Parquet partition reader, after row-group filtering and predicate pushdown has been applied.

abellina
abellina previously approved these changes Jul 18, 2022
Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. The new fallback code looks good as well +1

@firestarman
Copy link
Collaborator

LGTM

@jlowe
Copy link
Contributor Author

jlowe commented Jul 20, 2022

Rebuilding CI to ensure Iceberg tests run after #6020.

@jlowe
Copy link
Contributor Author

jlowe commented Jul 20, 2022

build

@jlowe
Copy link
Contributor Author

jlowe commented Jul 21, 2022

build

@firestarman
Copy link
Collaborator

A strange error failed premerge, retry it.

[2022-07-22T00:28:03.383Z] >               raise converted from None
[2022-07-22T00:28:03.384Z] E               pyspark.sql.utils.AnalysisException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwxr-xr-x
[2022-07-22T00:28:03.384Z] 
[2022-07-22T00:28:03.384Z] ../../../.download/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/utils.py:117: AnalysisException

@firestarman
Copy link
Collaborator

build

@pxLi
Copy link
Member

pxLi commented Jul 22, 2022

last premerge run failed orc_test

pyspark.sql.utils.AnalysisException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwxr-xr-x

there should be no permission issue in premerge environment (docker), not sure if any side effects

[2022-07-22T00:28:03.383Z] �[31m�[1m_ test_read_round_trip[-{'spark.rapids.sql.format.orc.reader.type': 'PERFILE'}-read_orc_sql-[Byte, Short, Integer, Long, Float, Double, String, Boolean, Date, Timestamp, Decimal(7,3), Decimal(12,2), Decimal(20,2)]] _�[0m
[2022-07-22T00:28:03.383Z] [gw1] linux -- Python 3.8.13 /usr/bin/python
[2022-07-22T00:28:03.383Z] 
[2022-07-22T00:28:03.383Z] spark_tmp_path = '/tmp/pyspark_tests//premerge-ci-2-jenkins-rapids-premerge-github-5165-79f8r-hwhvf-gw1-21368-140972398/'
[2022-07-22T00:28:03.383Z] orc_gens = [Byte, Short, Integer, Long, Float, Double, ...]
[2022-07-22T00:28:03.383Z] read_func = <function read_orc_sql at 0x7f00f4971d30>
[2022-07-22T00:28:03.383Z] reader_confs = {'spark.rapids.sql.format.orc.reader.type': 'PERFILE'}
[2022-07-22T00:28:03.383Z] v1_enabled_list = ''
[2022-07-22T00:28:03.383Z] 
[2022-07-22T00:28:03.383Z]     @pytest.mark.order(2)
[2022-07-22T00:28:03.383Z]     @pytest.mark.parametrize('orc_gens', orc_gens_list, ids=idfn)
[2022-07-22T00:28:03.383Z]     @pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
[2022-07-22T00:28:03.383Z]     @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
[2022-07-22T00:28:03.383Z]     @pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
[2022-07-22T00:28:03.383Z]     def test_read_round_trip(spark_tmp_path, orc_gens, read_func, reader_confs, v1_enabled_list):
[2022-07-22T00:28:03.383Z]         gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gens)]
[2022-07-22T00:28:03.383Z]         data_path = spark_tmp_path + '/ORC_DATA'
[2022-07-22T00:28:03.383Z]         with_cpu_session(
[2022-07-22T00:28:03.383Z]                 lambda spark : gen_df(spark, gen_list).write.orc(data_path))
[2022-07-22T00:28:03.383Z]         all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
[2022-07-22T00:28:03.383Z] >       assert_gpu_and_cpu_are_equal_collect(
[2022-07-22T00:28:03.383Z]                 read_func(data_path),
[2022-07-22T00:28:03.383Z]                 conf=all_confs)
[2022-07-22T00:28:03.383Z] 
[2022-07-22T00:28:03.383Z] �[1m�[31m../../src/main/python/orc_test.py�[0m:142: 
[2022-07-22T00:28:03.383Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2022-07-22T00:28:03.383Z] �[1m�[31m../../src/main/python/asserts.py�[0m:508: in assert_gpu_and_cpu_are_equal_collect
[2022-07-22T00:28:03.383Z]     _assert_gpu_and_cpu_are_equal(func, 'COLLECT', conf=conf, is_cpu_first=is_cpu_first)
[2022-07-22T00:28:03.383Z] �[1m�[31m../../src/main/python/asserts.py�[0m:427: in _assert_gpu_and_cpu_are_equal
[2022-07-22T00:28:03.383Z]     run_on_cpu()
[2022-07-22T00:28:03.383Z] �[1m�[31m../../src/main/python/asserts.py�[0m:413: in run_on_cpu
[2022-07-22T00:28:03.383Z]     from_cpu = with_cpu_session(bring_back, conf=conf)
[2022-07-22T00:28:03.383Z] �[1m�[31m../../src/main/python/spark_session.py�[0m:115: in with_cpu_session
[2022-07-22T00:28:03.383Z]     return with_spark_session(func, conf=copy)
[2022-07-22T00:28:03.383Z] �[1m�[31m../../src/main/python/spark_session.py�[0m:99: in with_spark_session
[2022-07-22T00:28:03.383Z]     ret = func(_spark)
[2022-07-22T00:28:03.383Z] �[1m�[31m../../src/main/python/asserts.py�[0m:201: in <lambda>
[2022-07-22T00:28:03.383Z]     bring_back = lambda spark: limit_func(spark).collect()
[2022-07-22T00:28:03.383Z] �[1m�[31m../../src/main/python/orc_test.py�[0m:31: in <lambda>
[2022-07-22T00:28:03.383Z]     return lambda spark : spark.sql('select * from orc.`{}`'.format(data_path))
[2022-07-22T00:28:03.383Z] �[1m�[31m../../../.download/spark-3.1.1-bin-hadoop3.2/python/pyspark/sql/session.py�[0m:723: in sql
[2022-07-22T00:28:03.383Z]     return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
[2022-07-22T00:28:03.383Z] �[1m�[31m/home/jenkins/agent/workspace/jenkins-rapids_premerge-github-5165-ci-2/.download/spark-3.1.1-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py�[0m:1304: in __call__
[2022-07-22T00:28:03.383Z]     return_value = get_return_value(
[2022-07-22T00:28:03.383Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2022-07-22T00:28:03.383Z] 
[2022-07-22T00:28:03.383Z] a = ('xro13549', <py4j.java_gateway.GatewayClient object at 0x7f00cb751430>, 'o62', 'sql')
[2022-07-22T00:28:03.383Z] kw = {}
[2022-07-22T00:28:03.383Z] converted = AnalysisException('java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current per...e.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)\n\t... 74 more\n', JavaObject id=o13550)
[2022-07-22T00:28:03.383Z] 
[2022-07-22T00:28:03.383Z]     def deco(*a, **kw):
[2022-07-22T00:28:03.383Z]         try:
[2022-07-22T00:28:03.383Z]             return f(*a, **kw)
[2022-07-22T00:28:03.383Z]         except py4j.protocol.Py4JJavaError as e:
[2022-07-22T00:28:03.383Z]             converted = convert_exception(e.java_exception)
[2022-07-22T00:28:03.383Z]             if not isinstance(converted, UnknownException):
[2022-07-22T00:28:03.383Z]                 # Hide where the exception came from that shows a non-Pythonic
[2022-07-22T00:28:03.383Z]                 # JVM exception message.
[2022-07-22T00:28:03.383Z] >               raise converted from None
[2022-07-22T00:28:03.384Z] �[1m�[31mE               pyspark.sql.utils.AnalysisException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwxr-xr-x�[0m

@firestarman
Copy link
Collaborator

firestarman commented Jul 25, 2022

Hi @jlowe ,
Seems Iceberg tests do not run in premerge, would we need it ?

@jlowe
Copy link
Contributor Author

jlowe commented Jul 25, 2022

Seems Iceberg tests do not run in premerge, would we need it ?

I avoided adding it to premerge since the iceberg tests are currently serial would slow down premerge. I was going to file a followup to address this, but if you feel it should be part of premerge happy to update the PR.

@jlowe
Copy link
Contributor Author

jlowe commented Jul 25, 2022

build

2 similar comments
@jlowe
Copy link
Contributor Author

jlowe commented Jul 25, 2022

build

@jlowe
Copy link
Contributor Author

jlowe commented Jul 25, 2022

build

@firestarman
Copy link
Collaborator

firestarman commented Jul 26, 2022

if you feel it should be part of premerge happy to update the PR

A seperate issue is good fo me. And we can have more discussion about whether to add it.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
feature request New feature or request
Projects
None yet
7 participants