From 5818905763a74fea00c2281daf0600c0a9647744 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 18 Mar 2022 14:45:30 -0500 Subject: [PATCH] Add shim for Databricks 10.4 [databricks] (#4974) * Add shim for Databricks 10.4 Signed-off-by: Jason Lowe * Add missing source directory for 304 shim * Add missing import * Remove unused HADOOP_FULL_VERSION * Fix Databricks version check to numerically compare * Add comments, code cleanup * Add 311+-db directory * Move FileOptions into a v2 shim * Fix Databricks version check --- .../rapids-shuffle.md | 27 +-- integration_tests/pom.xml | 4 +- .../src/main/python/array_test.py | 4 +- .../src/main/python/hash_aggregate_test.py | 5 +- integration_tests/src/main/python/map_test.py | 6 +- .../src/main/python/spark_session.py | 14 +- jenkins/databricks/build.sh | 167 +++++++++------- jenkins/databricks/test.sh | 5 + pom.xml | 84 ++++++++ sql-plugin/pom.xml | 16 +- .../rapids/shims/AggregationTagging.scala | 22 ++ .../com/nvidia/spark/rapids/SparkShims.scala | 2 +- .../rapids/shims/AggregationTagging.scala | 22 ++ .../rapids/shims/GpuHashPartitioning.scala | 45 +++++ ...c.scala => GpuRunningWindowExecMeta.scala} | 0 .../spark/rapids/shims/RapidsErrorUtils.scala | 0 .../spark/rapids/shims/Spark30XdbShims.scala | 10 + .../spark/sql/catalyst/csv/GpuCsvUtils.scala | 0 .../sql/catalyst/json/GpuJsonUtils.scala | 0 .../json/rapids/shims/FileOptionsShims.scala} | 4 +- .../rapids/shims/GpuHashPartitioning.scala | 0 .../spark/rapids/shims/RapidsErrorUtils.scala | 34 ++++ .../rapids/shims/Spark30Xuntil33XShims.scala | 3 +- .../spark/sql/catalyst/csv/GpuCsvUtils.scala | 21 ++ .../sql/catalyst/json/GpuJsonUtils.scala | 21 ++ .../json/rapids/shims/FileOptionsShims.scala | 32 +++ .../nvidia/spark/rapids/shims/AQEUtils.scala | 0 .../rapids/shims/GpuRegExpReplaceExec.scala | 0 .../shims/GpuRunningWindowExecMeta.scala} | 0 .../rapids/shims/GpuWindowInPandasExec.scala | 0 .../shims/GpuGroupUDFArrowPythonRunner.scala | 0 .../sql/rapids/shims/GpuFileScanRDD.scala | 0 .../rapids/shims/GpuHashPartitioning.scala | 45 +++++ .../spark/rapids/shims/SparkShims.scala | 3 +- .../rapids/shims/AggregationTagging.scala | 22 ++ .../spark/rapids/shims/RapidsErrorUtils.scala | 34 ++++ .../spark/rapids/shims/Spark31XdbShims.scala | 10 + .../spark/sql/catalyst/csv/GpuCsvUtils.scala | 21 ++ .../sql/catalyst/json/GpuJsonUtils.scala | 21 ++ .../json/rapids/shims/FileOptionsShims.scala | 32 +++ .../rapids/shims/Spark320PlusNonDBShims.scala | 47 +++++ .../rapids/shims/Spark320PlusShims.scala | 20 +- .../spark/rapids/shims/SparkShims.scala | 4 +- .../spark/rapids/shims/SparkShims.scala | 4 +- .../rapids/shims/AggregationTagging.scala | 23 +++ .../rapids/shims/GpuHashPartitioning.scala | 44 ++++ .../spark/rapids/shims/RapidsErrorUtils.scala | 39 ++++ .../shims/ShimBroadcastExchangeLike.scala | 33 +++ .../spark/rapids/shims/SparkShims.scala | 189 ++++++++++++++++++ .../spark321db/SparkShimServiceProvider.scala | 34 ++++ .../spark321db/RapidsShuffleManager.scala | 25 +++ .../rapids/shims/GpuShuffleExchangeExec.scala | 73 +++++++ .../spark/sql/catalyst/csv/GpuCsvUtils.scala | 24 +++ .../sql/catalyst/json/GpuJsonUtils.scala | 24 +++ .../json/rapids/shims/FileOptionsShims.scala} | 4 +- .../shims/GpuFlatMapGroupsInPandasExec.scala | 188 +++++++++++++++++ .../RapidsShuffleInternalManager.scala | 59 ++++++ .../spark/rapids/shims/SparkShims.scala | 4 +- .../spark/rapids/shims/Spark33XShims.scala | 3 +- .../json/rapids/shims/FileOptionsShims.scala | 32 +++ .../spark/rapids/GpuBatchScanExec.scala | 4 +- .../com/nvidia/spark/rapids/SparkShims.scala | 2 - .../com/nvidia/spark/rapids/aggregate.scala | 71 +++++-- .../catalyst/json/rapids/GpuJsonScan.scala | 4 +- .../metrics/source/MockTaskContext.scala | 8 +- tests/pom.xml | 10 +- 66 files changed, 1545 insertions(+), 163 deletions(-) create mode 100644 sql-plugin/src/main/301+-nondb/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala create mode 100644 sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala create mode 100644 sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala rename sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/{GpuRunningWindowExec.scala => GpuRunningWindowExecMeta.scala} (100%) rename sql-plugin/src/main/{301until330-all => 301db}/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala (100%) rename sql-plugin/src/main/{301until330-all => 301db}/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala (100%) rename sql-plugin/src/main/{301until330-all => 301db}/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala (100%) rename sql-plugin/src/main/{301until330-all/scala/org/apache/spark/sql/catalyst/json/rapids/shims/Spark30Xuntil33XFileOptionsShims.scala => 301db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala} (91%) rename sql-plugin/src/main/{301until330-all => 301until330-nondb}/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala (100%) create mode 100644 sql-plugin/src/main/301until330-nondb/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala rename sql-plugin/src/main/{301until330-all => 301until330-nondb}/scala/com/nvidia/spark/rapids/shims/Spark30Xuntil33XShims.scala (89%) create mode 100644 sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala create mode 100644 sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala create mode 100644 sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala rename sql-plugin/src/main/{31xdb => 311+-db}/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala (100%) rename sql-plugin/src/main/{31xdb => 311+-db}/scala/com/nvidia/spark/rapids/shims/GpuRegExpReplaceExec.scala (100%) rename sql-plugin/src/main/{31xdb/scala/com/nvidia/spark/rapids/shims/GpuRunningWindowExec.scala => 311+-db/scala/com/nvidia/spark/rapids/shims/GpuRunningWindowExecMeta.scala} (100%) rename sql-plugin/src/main/{31xdb => 311+-db}/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala (100%) rename sql-plugin/src/main/{31xdb => 311+-db}/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala (100%) rename sql-plugin/src/main/{31xdb => 311+-db}/scala/org/apache/spark/sql/rapids/shims/GpuFileScanRDD.scala (100%) create mode 100644 sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala create mode 100644 sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala create mode 100644 sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala create mode 100644 sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala create mode 100644 sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala create mode 100644 sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala create mode 100644 sql-plugin/src/main/320+-nondb/scala/com/nvidia/spark/rapids/shims/Spark320PlusNonDBShims.scala create mode 100644 sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala create mode 100644 sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala create mode 100644 sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala create mode 100644 sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimBroadcastExchangeLike.scala create mode 100644 sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala create mode 100644 sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/spark321db/SparkShimServiceProvider.scala create mode 100644 sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/spark321db/RapidsShuffleManager.scala create mode 100644 sql-plugin/src/main/321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala create mode 100644 sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala create mode 100644 sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala rename sql-plugin/src/main/{330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/Spark33XFileOptionsShims.scala => 321db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala} (90%) create mode 100644 sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala create mode 100644 sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/shims/spark321db/RapidsShuffleInternalManager.scala create mode 100644 sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala diff --git a/docs/additional-functionality/rapids-shuffle.md b/docs/additional-functionality/rapids-shuffle.md index f537494984f..679feec1f53 100644 --- a/docs/additional-functionality/rapids-shuffle.md +++ b/docs/additional-functionality/rapids-shuffle.md @@ -285,19 +285,20 @@ In this section, we are using a docker container built using the sample dockerfi 1. Choose the version of the shuffle manager that matches your Spark version. Currently we support: - | Spark Shim | spark.shuffle.manager value | - | --------------| -------------------------------------------------------- | - | 3.0.1 | com.nvidia.spark.rapids.spark301.RapidsShuffleManager | - | 3.0.2 | com.nvidia.spark.rapids.spark302.RapidsShuffleManager | - | 3.0.3 | com.nvidia.spark.rapids.spark303.RapidsShuffleManager | - | 3.1.1 | com.nvidia.spark.rapids.spark311.RapidsShuffleManager | - | 3.1.1 CDH | com.nvidia.spark.rapids.spark311cdh.RapidsShuffleManager | - | 3.1.2 | com.nvidia.spark.rapids.spark312.RapidsShuffleManager | - | 3.1.3 | com.nvidia.spark.rapids.spark313.RapidsShuffleManager | - | 3.2.0 | com.nvidia.spark.rapids.spark320.RapidsShuffleManager | - | 3.2.1 | com.nvidia.spark.rapids.spark321.RapidsShuffleManager | - | Databricks 7.3| com.nvidia.spark.rapids.spark301db.RapidsShuffleManager | - | Databricks 9.1| com.nvidia.spark.rapids.spark312db.RapidsShuffleManager | + | Spark Shim | spark.shuffle.manager value | + | --------------- | -------------------------------------------------------- | + | 3.0.1 | com.nvidia.spark.rapids.spark301.RapidsShuffleManager | + | 3.0.2 | com.nvidia.spark.rapids.spark302.RapidsShuffleManager | + | 3.0.3 | com.nvidia.spark.rapids.spark303.RapidsShuffleManager | + | 3.1.1 | com.nvidia.spark.rapids.spark311.RapidsShuffleManager | + | 3.1.1 CDH | com.nvidia.spark.rapids.spark311cdh.RapidsShuffleManager | + | 3.1.2 | com.nvidia.spark.rapids.spark312.RapidsShuffleManager | + | 3.1.3 | com.nvidia.spark.rapids.spark313.RapidsShuffleManager | + | 3.2.0 | com.nvidia.spark.rapids.spark320.RapidsShuffleManager | + | 3.2.1 | com.nvidia.spark.rapids.spark321.RapidsShuffleManager | + | Databricks 7.3 | com.nvidia.spark.rapids.spark301db.RapidsShuffleManager | + | Databricks 9.1 | com.nvidia.spark.rapids.spark312db.RapidsShuffleManager | + | Databricks 10.4 | com.nvidia.spark.rapids.spark321db.RapidsShuffleManager | 2. Settings for UCX 1.11.2+: diff --git a/integration_tests/pom.xml b/integration_tests/pom.xml index eef75b9c1bc..eb1feb329c6 100644 --- a/integration_tests/pom.xml +++ b/integration_tests/pom.xml @@ -208,8 +208,8 @@ org.apache.hadoop - hadoop-common - ${spark.version} + hadoop-client + ${hadoop.client.version} provided diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index 4572d42ece7..bd522d62d78 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -16,7 +16,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_and_cpu_error, assert_gpu_fallback_collect from data_gen import * -from spark_session import is_before_spark_311, is_before_spark_330 +from spark_session import is_before_spark_311, is_before_spark_330, is_databricks104_or_later from pyspark.sql.types import * from pyspark.sql.types import IntegralType from pyspark.sql.functions import array_contains, col, isnan, element_at @@ -78,7 +78,7 @@ def test_array_item_with_strict_index(strict_index_enabled, index): reason="Only in Spark [3.1.1, 3.3.0) with ANSI mode, it throws exceptions for invalid index") @pytest.mark.parametrize('index', [-2, 100, array_neg_index_gen, array_out_index_gen], ids=idfn) def test_array_item_ansi_fail_invalid_index(index): - message = "java.lang.ArrayIndexOutOfBoundsException" + message = "SparkArrayIndexOutOfBoundsException" if is_databricks104_or_later() else "java.lang.ArrayIndexOutOfBoundsException" if isinstance(index, int): test_func = lambda spark: unary_op_df(spark, ArrayGen(int_gen)).select(col('a')[index]).collect() else: diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index ba22427c8a6..c59225ddc09 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -24,7 +24,7 @@ from pyspark.sql.types import * from marks import * import pyspark.sql.functions as f -from spark_session import is_before_spark_311, with_cpu_session +from spark_session import is_before_spark_311, is_databricks104_or_later, with_cpu_session pytestmark = pytest.mark.nightly_resource_consuming_test @@ -421,6 +421,7 @@ def test_hash_grpby_avg(data_gen, conf): @pytest.mark.parametrize('data_gen', [ StructGen(children=[('a', int_gen), ('b', int_gen)],nullable=False, special_cases=[((None, None), 400.0), ((None, -1542301795), 100.0)])], ids=idfn) +@pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/4963') def test_hash_avg_nulls_partial_only(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: gen_df(spark, data_gen, length=2).agg(f.avg('b')), @@ -791,6 +792,7 @@ def test_hash_groupby_collect_partial_replace_fallback(data_gen, @pytest.mark.parametrize('replace_mode', _replace_modes_single_distinct, ids=idfn) @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) @pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn) +@pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/4963') def test_hash_groupby_collect_partial_replace_with_distinct_fallback(data_gen, replace_mode, aqe_enabled, @@ -1668,6 +1670,7 @@ def test_groupby_std_variance_nulls(data_gen, conf, ansi_enabled): @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) @pytest.mark.parametrize('replace_mode', _replace_modes_non_distinct, ids=idfn) @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) +@pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/4963') def test_groupby_std_variance_partial_replace_fallback(data_gen, conf, replace_mode, diff --git a/integration_tests/src/main/python/map_test.py b/integration_tests/src/main/python/map_test.py index 9530a12f818..a45fdbae893 100644 --- a/integration_tests/src/main/python/map_test.py +++ b/integration_tests/src/main/python/map_test.py @@ -18,7 +18,7 @@ assert_gpu_fallback_collect from data_gen import * from marks import incompat, allow_non_gpu -from spark_session import is_before_spark_311, is_before_spark_330 +from spark_session import is_before_spark_311, is_before_spark_330, is_databricks104_or_later from pyspark.sql.types import * from pyspark.sql.types import IntegralType @@ -215,7 +215,7 @@ def test_str_to_map_expr_with_all_regex_delimiters(): reason="Only in Spark 3.1.1+ (< 3.3.0) + ANSI mode, map key throws on no such element") @pytest.mark.parametrize('data_gen', [simple_string_to_string_map_gen], ids=idfn) def test_simple_get_map_value_ansi_fail(data_gen): - message = "java.util.NoSuchElementException" + message = "org.apache.spark.SparkNoSuchElementException" if is_databricks104_or_later() else "java.util.NoSuchElementException" assert_gpu_and_cpu_error( lambda spark: unary_op_df(spark, data_gen).selectExpr( 'a["NOT_FOUND"]').collect(), @@ -264,7 +264,7 @@ def test_simple_element_at_map(data_gen): @pytest.mark.skipif(is_before_spark_311(), reason="Only in Spark 3.1.1 + ANSI mode, map key throws on no such element") @pytest.mark.parametrize('data_gen', [simple_string_to_string_map_gen], ids=idfn) def test_map_element_at_ansi_fail(data_gen): - message = "org.apache.spark.SparkNoSuchElementException" if not is_before_spark_330() else "java.util.NoSuchElementException" + message = "org.apache.spark.SparkNoSuchElementException" if (not is_before_spark_330() or is_databricks104_or_later()) else "java.util.NoSuchElementException" # For 3.3.0+ strictIndexOperator should not affect element_at test_conf=copy_and_update(ansi_enabled_conf, {'spark.sql.ansi.strictIndexOperator': 'false'}) assert_gpu_and_cpu_error( diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index b2805844f0f..cfe2acd65e8 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -142,6 +142,16 @@ def is_before_spark_330(): def is_spark_330_or_later(): return spark_version() >= "3.3.0" -def is_databricks91_or_later(): +def is_databricks_version_or_later(major, minor): spark = get_spark_i_know_what_i_am_doing() - return spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "") >= "9.1" + version = spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "0.0") + parts = version.split(".") + if (len(parts) < 2): + raise RuntimeError("Unable to determine Databricks version from version string: " + version) + return int(parts[0]) >= major and int(parts[1]) >= minor + +def is_databricks91_or_later(): + return is_databricks_version_or_later(9, 1) + +def is_databricks104_or_later(): + return is_databricks_version_or_later(10, 4) diff --git a/jenkins/databricks/build.sh b/jenkins/databricks/build.sh index eb37f2b990e..2e5d5f6cc48 100755 --- a/jenkins/databricks/build.sh +++ b/jenkins/databricks/build.sh @@ -68,80 +68,119 @@ CUDF_JAR=${M2DIR}/ai/rapids/cudf/${CUDF_VERSION}/cudf-${CUDF_VERSION}-${CUDA_VER # pull normal Spark artifacts and ignore errors then install databricks jars, then build again JARDIR=/databricks/jars # install the Spark pom file so we get dependencies -SQLJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--sql--core--core-hive-2.3__hadoop-2.7_${SCALA_VERSION}_deploy.jar -CATALYSTJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--sql--catalyst--catalyst-hive-2.3__hadoop-2.7_${SCALA_VERSION}_deploy.jar -ANNOTJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--common--tags--tags-hive-2.3__hadoop-2.7_${SCALA_VERSION}_deploy.jar -COREJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--core--core-hive-2.3__hadoop-2.7_${SCALA_VERSION}_deploy.jar +case "$BASE_SPARK_VERSION" in + "3.2.1") + COMMONS_LANG3_VERSION=3.12.0 + COMMONS_IO_VERSION=2.8.0 + DB_VERSION=-0003 + FASTERXML_JACKSON_VERSION=2.12.3 + HADOOP_VERSION=3.2 + HIVE_FULL_VERSION=2.3.9 + JSON4S_VERSION=3.7.0-M11 + ORC_VERSION=1.6.12 + PARQUET_VERSION=1.12.0 + ;; + "3.1.2") + COMMONS_LANG3_VERSION=3.10 + COMMONS_IO_VERSION=2.4 + DB_VERSION=9 + FASTERXML_JACKSON_VERSION=2.10.0 + HADOOP_VERSION=2.7 + HIVE_FULL_VERSION=2.3.7 + JSON4S_VERSION=3.7.0-M5 + ORC_VERSION=1.5.12 + PARQUET_VERSION=1.10.1 + ;; + "3.0.1") + COMMONS_LANG3_VERSION=3.9 + COMMONS_IO_VERSION=2.4 + DB_VERSION=6 + FASTERXML_JACKSON_VERSION=2.10.0 + HADOOP_VERSION=2.7 + HIVE_FULL_VERSION=2.3.7 + JSON4S_VERSION=3.6.6 + ORC_VERSION=1.5.10 + PARQUET_VERSION=1.10.1 + ;; + *) echo "Unexpected Spark version: $BASE_SPARK_VERSION"; exit 1;; +esac + +SQLJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--sql--core--core-hive-2.3__hadoop-${HADOOP_VERSION}_${SCALA_VERSION}_deploy.jar +CATALYSTJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--sql--catalyst--catalyst-hive-2.3__hadoop-${HADOOP_VERSION}_${SCALA_VERSION}_deploy.jar +ANNOTJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--common--tags--tags-hive-2.3__hadoop-${HADOOP_VERSION}_${SCALA_VERSION}_deploy.jar +COREJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--core--core-hive-2.3__hadoop-${HADOOP_VERSION}_${SCALA_VERSION}_deploy.jar + COREPOM=spark-core_${SCALA_VERSION}-${BASE_SPARK_VERSION}.pom -HIVEJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--sql--hive--hive_2.12_deploy_shaded.jar +if [[ $BASE_SPARK_VERSION == "3.2.1" ]] +then + HIVEJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--sql--hive--hive-hive-2.3__hadoop-${HADOOP_VERSION}_${SCALA_VERSION}_deploy_shaded.jar +else + HIVEJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--sql--hive--hive_${SCALA_VERSION}_deploy_shaded.jar +fi + COREPOMPATH=$M2DIR/org/apache/spark/spark-core_${SCALA_VERSION}/${BASE_SPARK_VERSION} -# We may ened to come up with way to specify versions but for now hardcode and deal with for next Databricks version -HIVEEXECJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.hive--hive-exec-core--org.apache.hive__hive-exec-core__2.3.7.jar -HIVESERDEJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.hive--hive-serde--org.apache.hive__hive-serde__2.3.7.jar -HIVESTORAGE=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.hive--hive-storage-api--org.apache.hive__hive-storage-api__2.7.2.jar +# We may need to come up with way to specify versions but for now hardcode and deal with for next Databricks version +HIVEEXECJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hive--hive-exec-core--org.apache.hive__hive-exec-core__${HIVE_FULL_VERSION}.jar +HIVESERDEJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hive--hive-serde--org.apache.hive__hive-serde__${HIVE_FULL_VERSION}.jar +HIVESTORAGE=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hive--hive-storage-api--org.apache.hive__hive-storage-api__2.7.2.jar + +PARQUETHADOOPJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.parquet--parquet-hadoop--org.apache.parquet__parquet-hadoop__${PARQUET_VERSION}-databricks${DB_VERSION}.jar +PARQUETCOMMONJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.parquet--parquet-common--org.apache.parquet__parquet-common__${PARQUET_VERSION}-databricks${DB_VERSION}.jar +PARQUETCOLUMNJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.parquet--parquet-column--org.apache.parquet__parquet-column__${PARQUET_VERSION}-databricks${DB_VERSION}.jar +ORC_CORE_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.orc--orc-core--org.apache.orc__orc-core__${ORC_VERSION}.jar +ORC_SHIM_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.orc--orc-shims--org.apache.orc__orc-shims__${ORC_VERSION}.jar +ORC_MAPREDUCE_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.orc--orc-mapreduce--org.apache.orc__orc-mapreduce__${ORC_VERSION}.jar -if [[ $BASE_SPARK_VERSION == "3.1.2" ]] +PROTOBUF_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--com.google.protobuf--protobuf-java--com.google.protobuf__protobuf-java__2.6.1.jar +if [[ $BASE_SPARK_VERSION == "3.2.1" ]] then - PARQUETHADOOPJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-hadoop--org.apache.parquet__parquet-hadoop__1.10.1-databricks9.jar - PARQUETCOMMONJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-common--org.apache.parquet__parquet-common__1.10.1-databricks9.jar - PARQUETCOLUMNJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-column--org.apache.parquet__parquet-column__1.10.1-databricks9.jar - ORC_CORE_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-core--org.apache.orc__orc-core__1.5.12.jar - ORC_SHIM_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-shims--org.apache.orc__orc-shims__1.5.12.jar - ORC_MAPREDUCE_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-mapreduce--org.apache.orc__orc-mapreduce__1.5.12.jar + PARQUETFORMATJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.parquet--parquet-format-structures--org.apache.parquet__parquet-format-structures__${PARQUET_VERSION}-databricks${DB_VERSION}.jar else - PARQUETHADOOPJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-hadoop--org.apache.parquet__parquet-hadoop__1.10.1-databricks6.jar - PARQUETCOMMONJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-common--org.apache.parquet__parquet-common__1.10.1-databricks6.jar - PARQUETCOLUMNJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-column--org.apache.parquet__parquet-column__1.10.1-databricks6.jar - ORC_CORE_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-core--org.apache.orc__orc-core__1.5.10.jar - ORC_SHIM_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-shims--org.apache.orc__orc-shims__1.5.10.jar - ORC_MAPREDUCE_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.orc--orc-mapreduce--org.apache.orc__orc-mapreduce__1.5.10.jar + PARQUETFORMATJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.parquet--parquet-format--org.apache.parquet__parquet-format__2.4.0.jar fi -PROTOBUF_JAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--com.google.protobuf--protobuf-java--com.google.protobuf__protobuf-java__2.6.1.jar -PARQUETFORMATJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.parquet--parquet-format--org.apache.parquet__parquet-format__2.4.0.jar +NETWORKCOMMON=----workspace_${SPARK_MAJOR_VERSION_STRING}--common--network-common--network-common-hive-2.3__hadoop-${HADOOP_VERSION}_${SCALA_VERSION}_deploy.jar +NETWORKSHUFFLE=----workspace_${SPARK_MAJOR_VERSION_STRING}--common--network-shuffle--network-shuffle-hive-2.3__hadoop-${HADOOP_VERSION}_${SCALA_VERSION}_deploy.jar +COMMONUNSAFE=----workspace_${SPARK_MAJOR_VERSION_STRING}--common--unsafe--unsafe-hive-2.3__hadoop-${HADOOP_VERSION}_${SCALA_VERSION}_deploy.jar +LAUNCHER=----workspace_${SPARK_MAJOR_VERSION_STRING}--launcher--launcher-hive-2.3__hadoop-${HADOOP_VERSION}_${SCALA_VERSION}_deploy.jar -NETWORKCOMMON=----workspace_${SPARK_MAJOR_VERSION_STRING}--common--network-common--network-common-hive-2.3__hadoop-2.7_2.12_deploy.jar -COMMONUNSAFE=----workspace_${SPARK_MAJOR_VERSION_STRING}--common--unsafe--unsafe-hive-2.3__hadoop-2.7_2.12_deploy.jar -LAUNCHER=----workspace_${SPARK_MAJOR_VERSION_STRING}--launcher--launcher-hive-2.3__hadoop-2.7_2.12_deploy.jar +KRYO=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--com.esotericsoftware--kryo-shaded--com.esotericsoftware__kryo-shaded__4.0.2.jar -KRYO=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--com.esotericsoftware--kryo-shaded--com.esotericsoftware__kryo-shaded__4.0.2.jar - -APACHECOMMONS=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--commons-io--commons-io--commons-io__commons-io__2.4.jar +APACHECOMMONS=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--commons-io--commons-io--commons-io__commons-io__${COMMONS_IO_VERSION}.jar +APACHECOMMONSLANG3=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.commons--commons-lang3--org.apache.commons__commons-lang3__${COMMONS_LANG3_VERSION}.jar if [[ $BASE_SPARK_VERSION == "3.0.1" ]] then - JSON4S=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.json4s--json4s-ast_2.12--org.json4s__json4s-ast_2.12__3.6.6.jar - APACHECOMMONSLANG3=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.commons--commons-lang3--org.apache.commons__commons-lang3__3.9.jar - HIVESTORAGE=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.hive--hive-storage-api--org.apache.hive__hive-storage-api__2.7.1.jar - ARROWFORMATJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.arrow--arrow-format--org.apache.arrow__arrow-format__0.15.1.jar - ARROWMEMORYJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.arrow--arrow-memory--org.apache.arrow__arrow-memory__0.15.1.jar - ARROWVECTORJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.arrow--arrow-vector--org.apache.arrow__arrow-vector__0.15.1.jar + HIVESTORAGE=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hive--hive-storage-api--org.apache.hive__hive-storage-api__2.7.1.jar + ARROWFORMATJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.arrow--arrow-format--org.apache.arrow__arrow-format__0.15.1.jar + ARROWMEMORYJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.arrow--arrow-memory--org.apache.arrow__arrow-memory__0.15.1.jar + ARROWVECTORJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.arrow--arrow-vector--org.apache.arrow__arrow-vector__0.15.1.jar HIVEEXECJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--patched-hive-with-glue--hive-exec-core_shaded.jar else - JSON4S=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.json4s--json4s-ast_2.12--org.json4s__json4s-ast_2.12__3.7.0-M5.jar - APACHECOMMONSLANG3=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.commons--commons-lang3--org.apache.commons__commons-lang3__3.10.jar - HIVESTORAGE=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.hive--hive-storage-api--org.apache.hive__hive-storage-api__2.7.2.jar - if [[ $BASE_SPARK_VERSION == "3.1.2" ]]; then - HIVEEXECJAR=----workspace_spark_3_1--patched-hive-with-glue--hive-exec-core_shaded.jar + HIVESTORAGE=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hive--hive-storage-api--org.apache.hive__hive-storage-api__2.7.2.jar + if [[ $BASE_SPARK_VERSION != "3.0.1" ]]; then + HIVEEXECJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--patched-hive-with-glue--hive-exec-core_shaded.jar else - HIVEEXECJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.hive--hive-exec-core--org.apache.hive__hive-exec-core__2.3.7.jar + HIVEEXECJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hive--hive-exec-core--org.apache.hive__hive-exec-core__2.3.7.jar fi - ARROWFORMATJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.arrow--arrow-format--org.apache.arrow__arrow-format__2.0.0.jar - ARROWMEMORYJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.arrow--arrow-memory-core--org.apache.arrow__arrow-memory-core__2.0.0.jar - ARROWMEMORYNETTYJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.arrow--arrow-memory-netty--org.apache.arrow__arrow-memory-netty__2.0.0.jar - ARROWVECTORJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.arrow--arrow-vector--org.apache.arrow__arrow-vector__2.0.0.jar + ARROWFORMATJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.arrow--arrow-format--org.apache.arrow__arrow-format__2.0.0.jar + ARROWMEMORYJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.arrow--arrow-memory-core--org.apache.arrow__arrow-memory-core__2.0.0.jar + ARROWMEMORYNETTYJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.arrow--arrow-memory-netty--org.apache.arrow__arrow-memory-netty__2.0.0.jar + ARROWVECTORJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.arrow--arrow-vector--org.apache.arrow__arrow-vector__2.0.0.jar fi -JAVAASSIST=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.javassist--javassist--org.javassist__javassist__3.25.0-GA.jar +JSON4S=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.json4s--json4s-ast_2.12--org.json4s__json4s-ast_2.12__${JSON4S_VERSION}.jar + +JAVAASSIST=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.javassist--javassist--org.javassist__javassist__3.25.0-GA.jar -PROTOBUFJAVA=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--com.google.protobuf--protobuf-java--com.google.protobuf__protobuf-java__2.6.1.jar +PROTOBUFJAVA=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--com.google.protobuf--protobuf-java--com.google.protobuf__protobuf-java__2.6.1.jar -JACKSONCORE=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--com.fasterxml.jackson.core--jackson-databind--com.fasterxml.jackson.core__jackson-databind__2.10.0.jar -JACKSONANNOTATION=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--com.fasterxml.jackson.core--jackson-annotations--com.fasterxml.jackson.core__jackson-annotations__2.10.0.jar +JACKSONCORE=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--com.fasterxml.jackson.core--jackson-databind--com.fasterxml.jackson.core__jackson-databind__${FASTERXML_JACKSON_VERSION}.jar +JACKSONANNOTATION=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--com.fasterxml.jackson.core--jackson-annotations--com.fasterxml.jackson.core__jackson-annotations__${FASTERXML_JACKSON_VERSION}.jar -HADOOPCOMMON=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.hadoop--hadoop-common--org.apache.hadoop__hadoop-common__2.7.4.jar -HADOOPMAPRED=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.hadoop--hadoop-mapreduce-client-core--org.apache.hadoop__hadoop-mapreduce-client-core__2.7.4.jar +HADOOPCOMMON=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hadoop--hadoop-common--org.apache.hadoop__hadoop-common__2.7.4.jar +HADOOPMAPRED=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hadoop--hadoop-mapreduce-client-core--org.apache.hadoop__hadoop-mapreduce-client-core__2.7.4.jar # Please note we are installing all of these dependencies using the Spark version (SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS) to make it easier # to specify the dependencies in the pom files @@ -266,6 +305,14 @@ mvn -B install:install-file \ -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ -Dpackaging=jar +mvn -B install:install-file \ + -Dmaven.repo.local=$M2DIR \ + -Dfile=$JARDIR/$NETWORKSHUFFLE \ + -DgroupId=org.apache.spark \ + -DartifactId=spark-network-shuffle_$SCALA_VERSION \ + -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ + -Dpackaging=jar + mvn -B install:install-file \ -Dmaven.repo.local=$M2DIR \ -Dfile=$JARDIR/$COMMONUNSAFE\ @@ -354,22 +401,6 @@ mvn -B install:install-file \ -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ -Dpackaging=jar -mvn -B install:install-file \ - -Dmaven.repo.local=$M2DIR \ - -Dfile=$JARDIR/$HADOOPCOMMON \ - -DgroupId=org.apache.hadoop \ - -DartifactId=hadoop-common \ - -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ - -Dpackaging=jar - -mvn -B install:install-file \ - -Dmaven.repo.local=$M2DIR \ - -Dfile=$JARDIR/$HADOOPMAPRED \ - -DgroupId=org.apache.hadoop \ - -DartifactId=hadoop-mapreduce-client \ - -Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \ - -Dpackaging=jar - mvn -B install:install-file \ -Dmaven.repo.local=$M2DIR \ -Dfile=$JARDIR/$ORC_CORE_JAR \ diff --git a/jenkins/databricks/test.sh b/jenkins/databricks/test.sh index 6dbe30e52d6..0322f2f7ddb 100755 --- a/jenkins/databricks/test.sh +++ b/jenkins/databricks/test.sh @@ -34,6 +34,11 @@ export SPARK_HOME=/databricks/spark # change to not point at databricks confs so we don't conflict with their settings export SPARK_CONF_DIR=$PWD export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark/:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip +if [[ $BASE_SPARK_VER == "3.2.1" ]] +then + # Databricks Koalas can conflict with the actual Pandas version, so put site packages first + export PYTHONPATH=/databricks/python3/lib/python3.8/site-packages:$PYTHONPATH +fi sudo ln -s /databricks/jars/ $SPARK_HOME/jars || true sudo chmod 777 /databricks/data/logs/ sudo chmod 777 /databricks/data/logs/* diff --git a/pom.xml b/pom.xml index 74336372b98..bfe53d963c9 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,7 @@ ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -172,6 +173,7 @@ ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -231,6 +233,7 @@ ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -284,6 +287,7 @@ ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/pre320-treenode/scala @@ -335,6 +339,7 @@ ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until320-all/scala @@ -384,6 +389,7 @@ --> ${spark301db.version} ${spark301db.version} + 2.7.4 true @@ -444,6 +450,7 @@ --> ${spark312db.version} ${spark312db.version} + 2.7.4 true @@ -464,6 +471,7 @@ ${project.basedir}/src/main/301until330-all/scala ${project.basedir}/src/main/311until320-all/scala ${project.basedir}/src/main/311+-all/scala + ${project.basedir}/src/main/311+-db/scala ${project.basedir}/src/main/311until320-noncdh/scala ${project.basedir}/src/main/31xdb/scala ${project.basedir}/src/main/post320-treenode/scala @@ -515,6 +523,7 @@ ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until320-all/scala @@ -572,6 +581,7 @@ ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until320-all/scala @@ -629,6 +639,7 @@ ${project.basedir}/src/main/301until320-noncdh/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311until320-all/scala @@ -693,10 +704,12 @@ ${project.basedir}/src/main/301+-nondb/scala ${project.basedir}/src/main/320/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/320/scala ${project.basedir}/src/main/320+/scala + ${project.basedir}/src/main/320+-nondb/scala ${project.basedir}/src/main/320until330-all/scala ${project.basedir}/src/main/post320-treenode/scala @@ -756,9 +769,11 @@ ${project.basedir}/src/main/301+-nondb/scala ${project.basedir}/src/main/321/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/320+/scala + ${project.basedir}/src/main/320+-nondb/scala ${project.basedir}/src/main/320until330-all/scala ${project.basedir}/src/main/321+/scala ${project.basedir}/src/main/post320-treenode/scala @@ -819,9 +834,11 @@ ${project.basedir}/src/main/301+-nondb/scala ${project.basedir}/src/main/322/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/320+/scala + ${project.basedir}/src/main/320+-nondb/scala ${project.basedir}/src/main/321+/scala ${project.basedir}/src/main/320until330-all/scala ${project.basedir}/src/main/post320-treenode/scala @@ -855,6 +872,69 @@ tests-spark310+ + + + release321db + + + buildver + 321db + + + + + 3.4.4 + spark321db + spark321db + + ${spark321db.version} + ${spark321db.version} + 3.3.1 + true + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-profile-src-31+ + add-source + none + + + ${project.basedir}/src/main/321db/scala + ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/311+-all/scala + ${project.basedir}/src/main/311+-db/scala + ${project.basedir}/src/main/320+/scala + ${project.basedir}/src/main/321+/scala + ${project.basedir}/src/main/320until330-all/scala + ${project.basedir}/src/main/post320-treenode/scala + + + + + + + + + common + dist + integration_tests + shuffle-plugin + sql-plugin + tests + udf-compiler + aggregator + + release330 @@ -884,6 +964,7 @@ ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/320+/scala + ${project.basedir}/src/main/320+-nondb/scala ${project.basedir}/src/main/321+/scala ${project.basedir}/src/main/330+/scala ${project.basedir}/src/main/post320-treenode/scala @@ -947,6 +1028,7 @@ ${project.basedir}/src/main/301until320-all/scala ${project.basedir}/src/main/301until320-nondb/scala ${project.basedir}/src/main/301until330-all/scala + ${project.basedir}/src/main/301until330-nondb/scala ${project.basedir}/src/main/311+-all/scala ${project.basedir}/src/main/311+-nondb/scala ${project.basedir}/src/main/311cdh/scala @@ -1067,6 +1149,7 @@ 3.1.4-SNAPSHOT 3.2.0 3.2.1 + 3.2.1-databricks 3.2.2-SNAPSHOT 3.3.0-SNAPSHOT 3.6.0 @@ -1077,6 +1160,7 @@ ${spark.version.classifier} 1.7.30 1.11.0 + 3.3.1 org/scala-lang/scala-library/${scala.version}/scala-library-${scala.version}.jar ${spark.version.classifier} diff --git a/sql-plugin/pom.xml b/sql-plugin/pom.xml index 35766938d06..9897e7c7293 100644 --- a/sql-plugin/pom.xml +++ b/sql-plugin/pom.xml @@ -171,6 +171,12 @@ ${spark.version} provided + + org.apache.spark + spark-network-shuffle_${scala.binary.version} + ${spark.version} + provided + org.apache.spark spark-launcher_${scala.binary.version} @@ -251,14 +257,8 @@ org.apache.hadoop - hadoop-common - ${spark.version} - provided - - - org.apache.hadoop - hadoop-mapreduce-client - ${spark.version} + hadoop-client + ${hadoop.client.version} provided diff --git a/sql-plugin/src/main/301+-nondb/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala b/sql-plugin/src/main/301+-nondb/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala new file mode 100644 index 00000000000..823c81fe3f2 --- /dev/null +++ b/sql-plugin/src/main/301+-nondb/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +object AggregationTagging { + // Whether aggregations must be replaced only when both halves are replaced. + val mustReplaceBoth: Boolean = false +} diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/SparkShims.scala index 36b07462d3e..68382bfd084 100644 --- a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.shims import com.nvidia.spark.rapids._ -object SparkShimImpl extends Spark30XdbShims with Spark30Xuntil33XShims { +object SparkShimImpl extends Spark30XdbShims { override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion } diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala new file mode 100644 index 00000000000..823c81fe3f2 --- /dev/null +++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +object AggregationTagging { + // Whether aggregations must be replaced only when both halves are replaced. + val mustReplaceBoth: Boolean = false +} diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala new file mode 100644 index 00000000000..c2810f37d91 --- /dev/null +++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.GpuHashPartitioningBase + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution} + +case class GpuHashPartitioning(expressions: Seq[Expression], numPartitions: Int) + extends GpuHashPartitioningBase(expressions, numPartitions) { + + override def satisfies0(required: Distribution): Boolean = { + super.satisfies0(required) || { + required match { + case h: HashClusteredDistribution => + expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { + case (l, r) => l.semanticEquals(r) + } + case ClusteredDistribution(requiredClustering, _) => + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) + case _ => false + } + } + } + +} + +object GpuHashPartitioning { + def getDistribution(exprs: Seq[Expression]): Distribution = HashClusteredDistribution(exprs) +} diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/GpuRunningWindowExec.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/GpuRunningWindowExecMeta.scala similarity index 100% rename from sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/GpuRunningWindowExec.scala rename to sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/GpuRunningWindowExecMeta.scala diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala similarity index 100% rename from sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala rename to sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/Spark30XdbShims.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/Spark30XdbShims.scala index dd37911cd50..ea9143802cd 100644 --- a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/Spark30XdbShims.scala +++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/Spark30XdbShims.scala @@ -50,6 +50,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileIndex, F import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters import org.apache.spark.sql.execution.datasources.rapids.GpuPartitioningUtils +import org.apache.spark.sql.execution.datasources.v2.ShowCurrentNamespaceExec import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec} @@ -680,4 +681,13 @@ abstract class Spark30XdbShims extends Spark30XdbShimsBase with Logging { AvoidAdaptiveTransitionToRow(GpuRowToColumnarExec(a, goal)) } + def neverReplaceShowCurrentNamespaceCommand: ExecRule[_ <: SparkPlan] = { + GpuOverrides.neverReplaceExec[ShowCurrentNamespaceExec]("Namespace metadata operation") + } +} + +// First, Last and Collect have mistakenly been marked as non-deterministic until Spark-3.3. +// They are actually deterministic iff their child expression is deterministic. +trait GpuDeterministicFirstLastCollectShim extends Expression { + override lazy val deterministic = false } diff --git a/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala b/sql-plugin/src/main/301db/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala similarity index 100% rename from sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala rename to sql-plugin/src/main/301db/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala diff --git a/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/301db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala similarity index 100% rename from sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala rename to sql-plugin/src/main/301db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala diff --git a/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/rapids/shims/Spark30Xuntil33XFileOptionsShims.scala b/sql-plugin/src/main/301db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala similarity index 91% rename from sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/rapids/shims/Spark30Xuntil33XFileOptionsShims.scala rename to sql-plugin/src/main/301db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala index be904decb53..40fa54478af 100644 --- a/sql-plugin/src/main/301until330-all/scala/org/apache/spark/sql/catalyst/json/rapids/shims/Spark30Xuntil33XFileOptionsShims.scala +++ b/sql-plugin/src/main/301db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala @@ -16,12 +16,10 @@ package org.apache.spark.sql.catalyst.json.rapids.shims -import com.nvidia.spark.rapids.SparkShims - import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.json.JSONOptions -trait Spark30Xuntil33XFileOptionsShims extends SparkShims { +object FileOptionsShims { def timestampFormatInRead(fileOptions: Serializable): Option[String] = { fileOptions match { diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala b/sql-plugin/src/main/301until330-nondb/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala similarity index 100% rename from sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala rename to sql-plugin/src/main/301until330-nondb/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala diff --git a/sql-plugin/src/main/301until330-nondb/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/301until330-nondb/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala new file mode 100644 index 00000000000..e3ee4ccd6be --- /dev/null +++ b/sql-plugin/src/main/301until330-nondb/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +object RapidsErrorUtils { + def invalidArrayIndexError(index: Int, numElements: Int, + isElementAtF: Boolean = false): ArrayIndexOutOfBoundsException = { + // Follow the Spark string format before 3.3.0 + new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: $numElements") + } + + def mapKeyNotExistError(key: String, isElementAtF: Boolean = false): NoSuchElementException = { + // Follow the Spark string format before 3.3.0 + new NoSuchElementException(s"Key $key does not exist.") + } + + def sqlArrayIndexNotStartAtOneError(): ArrayIndexOutOfBoundsException = { + new ArrayIndexOutOfBoundsException("SQL array indices start at 1") + } +} diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/Spark30Xuntil33XShims.scala b/sql-plugin/src/main/301until330-nondb/scala/com/nvidia/spark/rapids/shims/Spark30Xuntil33XShims.scala similarity index 89% rename from sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/Spark30Xuntil33XShims.scala rename to sql-plugin/src/main/301until330-nondb/scala/com/nvidia/spark/rapids/shims/Spark30Xuntil33XShims.scala index 44bafb21d83..8ab1f4b6719 100644 --- a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/Spark30Xuntil33XShims.scala +++ b/sql-plugin/src/main/301until330-nondb/scala/com/nvidia/spark/rapids/shims/Spark30Xuntil33XShims.scala @@ -19,11 +19,10 @@ package com.nvidia.spark.rapids.shims import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.json.rapids.shims.Spark30Xuntil33XFileOptionsShims import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.v2._ -trait Spark30Xuntil33XShims extends Spark30Xuntil33XFileOptionsShims { +trait Spark30Xuntil33XShims extends SparkShims { def neverReplaceShowCurrentNamespaceCommand: ExecRule[_ <: SparkPlan] = { GpuOverrides.neverReplaceExec[ShowCurrentNamespaceExec]("Namespace metadata operation") diff --git a/sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala b/sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala new file mode 100644 index 00000000000..b8736640a9f --- /dev/null +++ b/sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +object GpuCsvUtils { + def dateFormatInRead(options: CSVOptions): String = options.dateFormat +} diff --git a/sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala new file mode 100644 index 00000000000..b22da8a4f71 --- /dev/null +++ b/sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +object GpuJsonUtils { + def dateFormatInRead(options: JSONOptions): String = options.dateFormat +} diff --git a/sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala b/sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala new file mode 100644 index 00000000000..40fa54478af --- /dev/null +++ b/sql-plugin/src/main/301until330-nondb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json.rapids.shims + +import org.apache.spark.sql.catalyst.csv.CSVOptions +import org.apache.spark.sql.catalyst.json.JSONOptions + +object FileOptionsShims { + + def timestampFormatInRead(fileOptions: Serializable): Option[String] = { + fileOptions match { + case csvOpts: CSVOptions => Option(csvOpts.timestampFormat) + case jsonOpts: JSONOptions => Option(jsonOpts.timestampFormat) + case _ => throw new RuntimeException("Wrong file options.") + } + } + +} diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala similarity index 100% rename from sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala rename to sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/GpuRegExpReplaceExec.scala b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/GpuRegExpReplaceExec.scala similarity index 100% rename from sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/GpuRegExpReplaceExec.scala rename to sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/GpuRegExpReplaceExec.scala diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/GpuRunningWindowExec.scala b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/GpuRunningWindowExecMeta.scala similarity index 100% rename from sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/GpuRunningWindowExec.scala rename to sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/GpuRunningWindowExecMeta.scala diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala similarity index 100% rename from sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala rename to sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala b/sql-plugin/src/main/311+-db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala similarity index 100% rename from sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala rename to sql-plugin/src/main/311+-db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/rapids/shims/GpuFileScanRDD.scala b/sql-plugin/src/main/311+-db/scala/org/apache/spark/sql/rapids/shims/GpuFileScanRDD.scala similarity index 100% rename from sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/rapids/shims/GpuFileScanRDD.scala rename to sql-plugin/src/main/311+-db/scala/org/apache/spark/sql/rapids/shims/GpuFileScanRDD.scala diff --git a/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala b/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala new file mode 100644 index 00000000000..c2810f37d91 --- /dev/null +++ b/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.GpuHashPartitioningBase + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution} + +case class GpuHashPartitioning(expressions: Seq[Expression], numPartitions: Int) + extends GpuHashPartitioningBase(expressions, numPartitions) { + + override def satisfies0(required: Distribution): Boolean = { + super.satisfies0(required) || { + required match { + case h: HashClusteredDistribution => + expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { + case (l, r) => l.semanticEquals(r) + } + case ClusteredDistribution(requiredClustering, _) => + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) + case _ => false + } + } + } + +} + +object GpuHashPartitioning { + def getDistribution(exprs: Seq[Expression]): Distribution = HashClusteredDistribution(exprs) +} diff --git a/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index fa544e24dff..fe6cd166a30 100644 --- a/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -22,7 +22,7 @@ import org.apache.parquet.schema.MessageType import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters -object SparkShimImpl extends Spark31XdbShims with Spark30Xuntil33XShims { +object SparkShimImpl extends Spark31XdbShims { override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion @@ -41,5 +41,6 @@ object SparkShimImpl extends Spark31XdbShims with Spark30Xuntil33XShims { new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode) } + override def isCastingStringToNegDecimalScaleSupported: Boolean = false } diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala new file mode 100644 index 00000000000..823c81fe3f2 --- /dev/null +++ b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +object AggregationTagging { + // Whether aggregations must be replaced only when both halves are replaced. + val mustReplaceBoth: Boolean = false +} diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala new file mode 100644 index 00000000000..e3ee4ccd6be --- /dev/null +++ b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +object RapidsErrorUtils { + def invalidArrayIndexError(index: Int, numElements: Int, + isElementAtF: Boolean = false): ArrayIndexOutOfBoundsException = { + // Follow the Spark string format before 3.3.0 + new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: $numElements") + } + + def mapKeyNotExistError(key: String, isElementAtF: Boolean = false): NoSuchElementException = { + // Follow the Spark string format before 3.3.0 + new NoSuchElementException(s"Key $key does not exist.") + } + + def sqlArrayIndexNotStartAtOneError(): ArrayIndexOutOfBoundsException = { + new ArrayIndexOutOfBoundsException("SQL array indices start at 1") + } +} diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala index f9c255132e7..5ab084f95a2 100644 --- a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala +++ b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/Spark31XdbShims.scala @@ -50,6 +50,7 @@ import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsComman import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.rapids.GpuPartitioningUtils +import org.apache.spark.sql.execution.datasources.v2.ShowCurrentNamespaceExec import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ReusedExchangeExec, ShuffleExchangeExec} @@ -820,4 +821,13 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging { AvoidAdaptiveTransitionToRow(GpuRowToColumnarExec(a, goal)) } + def neverReplaceShowCurrentNamespaceCommand: ExecRule[_ <: SparkPlan] = { + GpuOverrides.neverReplaceExec[ShowCurrentNamespaceExec]("Namespace metadata operation") + } +} + +// First, Last and Collect have mistakenly been marked as non-deterministic until Spark-3.3. +// They are actually deterministic iff their child expression is deterministic. +trait GpuDeterministicFirstLastCollectShim extends Expression { + override lazy val deterministic = false } diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala new file mode 100644 index 00000000000..b8736640a9f --- /dev/null +++ b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +object GpuCsvUtils { + def dateFormatInRead(options: CSVOptions): String = options.dateFormat +} diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala new file mode 100644 index 00000000000..b22da8a4f71 --- /dev/null +++ b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +object GpuJsonUtils { + def dateFormatInRead(options: JSONOptions): String = options.dateFormat +} diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala new file mode 100644 index 00000000000..40fa54478af --- /dev/null +++ b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json.rapids.shims + +import org.apache.spark.sql.catalyst.csv.CSVOptions +import org.apache.spark.sql.catalyst.json.JSONOptions + +object FileOptionsShims { + + def timestampFormatInRead(fileOptions: Serializable): Option[String] = { + fileOptions match { + case csvOpts: CSVOptions => Option(csvOpts.timestampFormat) + case jsonOpts: JSONOptions => Option(jsonOpts.timestampFormat) + case _ => throw new RuntimeException("Wrong file options.") + } + } + +} diff --git a/sql-plugin/src/main/320+-nondb/scala/com/nvidia/spark/rapids/shims/Spark320PlusNonDBShims.scala b/sql-plugin/src/main/320+-nondb/scala/com/nvidia/spark/rapids/shims/Spark320PlusNonDBShims.scala new file mode 100644 index 00000000000..4492fa6d958 --- /dev/null +++ b/sql-plugin/src/main/320+-nondb/scala/com/nvidia/spark/rapids/shims/Spark320PlusNonDBShims.scala @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.SparkShims +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.python.WindowInPandasExec + +/** + * Shim methods that can be compiled with every supported 3.2.0+ except Databricks versions + */ +trait Spark320PlusNonDBShims extends SparkShims { + + override final def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any = + mode.transform(rows) + + override final def newBroadcastQueryStageExec( + old: BroadcastQueryStageExec, + newPlan: SparkPlan): BroadcastQueryStageExec = + BroadcastQueryStageExec(old.id, newPlan, old._canonicalized) + + override final def filesFromFileIndex(fileIndex: PartitioningAwareFileIndex): Seq[FileStatus] = { + fileIndex.allFiles() + } + + def getWindowExpressions(winPy: WindowInPandasExec): Seq[NamedExpression] = winPy.windowExpression +} diff --git a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala index 37a61b7b32c..07519e1d86f 100644 --- a/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala +++ b/sql-plugin/src/main/320+/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala @@ -28,7 +28,7 @@ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuOverrides.exec import org.apache.arrow.memory.ReferenceManager import org.apache.arrow.vector.ValueVector -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging @@ -74,6 +74,8 @@ import org.apache.spark.unsafe.types.CalendarInterval */ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging { + def getWindowExpressions(winPy: WindowInPandasExec): Seq[NamedExpression] + override final def aqeShuffleReaderExec: ExecRule[_ <: SparkPlan] = exec[AQEShuffleReadExec]( "A wrapper of shuffle query stage", ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + TypeSig.ARRAY + @@ -84,12 +86,6 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging { plan.session } - override final def filesFromFileIndex( - fileIndex: PartitioningAwareFileIndex - ): Seq[FileStatus] = { - fileIndex.allFiles() - } - override def isEmptyRelation(relation: Any): Boolean = relation match { case EmptyHashedRelation => true case arr: Array[InternalRow] if arr.isEmpty => true @@ -100,14 +96,6 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging { Some(broadcastModeTransform(mode, Array.empty)).filter(isEmptyRelation) } - override final def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any = - mode.transform(rows) - - override final def newBroadcastQueryStageExec( - old: BroadcastQueryStageExec, - newPlan: SparkPlan): BroadcastQueryStageExec = - BroadcastQueryStageExec(old.id, newPlan, old._canonicalized) - override final def isExchangeOp(plan: SparkPlanMeta[_]): Boolean = { // if the child query stage already executed on GPU then we need to keep the // next operator on GPU in these cases @@ -512,7 +500,7 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging { TypeSig.all), (winPy, conf, p, r) => new GpuWindowInPandasExecMetaBase(winPy, conf, p, r) { override val windowExpressions: Seq[BaseExprMeta[NamedExpression]] = - winPy.windowExpression.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + getWindowExpressions(winPy).map(GpuOverrides.wrapExpr(_, conf, Some(this))) override def convertToGpu(): GpuExec = { GpuWindowInPandasExec( diff --git a/sql-plugin/src/main/320/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/320/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index fa184143355..ebe3e8e91d4 100644 --- a/sql-plugin/src/main/320/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/320/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -27,7 +27,9 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartitio import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters import org.apache.spark.sql.types.StructType -object SparkShimImpl extends Spark320PlusShims with Spark30Xuntil33XShims { +object SparkShimImpl extends Spark320PlusShims + with Spark320PlusNonDBShims + with Spark30Xuntil33XShims { override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion override def getFileScanRDD( diff --git a/sql-plugin/src/main/321/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/321/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index 255b58f1921..f51e36c78ec 100644 --- a/sql-plugin/src/main/321/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/321/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -25,7 +25,9 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} import org.apache.spark.sql.types.StructType -object SparkShimImpl extends Spark321PlusShims with Spark30Xuntil33XShims { +object SparkShimImpl extends Spark321PlusShims + with Spark320PlusNonDBShims + with Spark30Xuntil33XShims { override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion override def getFileScanRDD( diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala new file mode 100644 index 00000000000..de7f92c32a1 --- /dev/null +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AggregationTagging.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +object AggregationTagging { + // Whether aggregations must be replaced only when both halves are replaced. + // Setting this to true due to https://github.com/NVIDIA/spark-rapids/issues/4963 + val mustReplaceBoth: Boolean = true +} diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala new file mode 100644 index 00000000000..1cc3d317fbb --- /dev/null +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/GpuHashPartitioning.scala @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.GpuHashPartitioningBase + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, StatefulOpClusteredDistribution} + +case class GpuHashPartitioning(expressions: Seq[Expression], numPartitions: Int) + extends GpuHashPartitioningBase(expressions, numPartitions) { + + override def satisfies0(required: Distribution): Boolean = { + super.satisfies0(required) || { + required match { + case h: StatefulOpClusteredDistribution => + expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { + case (l, r) => l.semanticEquals(r) + } + case ClusteredDistribution(requiredClustering, _) => + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) + case _ => false + } + } + } +} + +object GpuHashPartitioning { + def getDistribution(exprs: Seq[Expression]): Distribution = ClusteredDistribution(exprs) +} diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala new file mode 100644 index 00000000000..b84316a8f2e --- /dev/null +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/RapidsErrorUtils.scala @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import org.apache.spark.sql.errors.QueryExecutionErrors + +object RapidsErrorUtils { + def invalidArrayIndexError(index: Int, numElements: Int, + isElementAtF: Boolean = false): ArrayIndexOutOfBoundsException = { + if (isElementAtF) { + QueryExecutionErrors.invalidElementAtIndexError(index, numElements) + } else { + QueryExecutionErrors.invalidArrayIndexError(index, numElements) + } + } + + def mapKeyNotExistError(key: String, isElementAtF: Boolean = false): NoSuchElementException = { + // For now, the default argument is false. The caller sets the correct value accordingly. + QueryExecutionErrors.mapKeyNotExistError(key) + } + + def sqlArrayIndexNotStartAtOneError(): ArrayIndexOutOfBoundsException = { + new ArrayIndexOutOfBoundsException("SQL array indices start at 1") + } +} diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimBroadcastExchangeLike.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimBroadcastExchangeLike.scala new file mode 100644 index 00000000000..e4545ccf42a --- /dev/null +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimBroadcastExchangeLike.scala @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import scala.concurrent.Promise + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike + +/** + * This shim handles the completion future differences between + * Apache Spark and Databricks. + */ +trait ShimBroadcastExchangeLike extends BroadcastExchangeLike { + @transient + protected lazy val promise = Promise[Broadcast[Any]]() + + override def completionFuture: concurrent.Future[Broadcast[Any]] = promise.future +} diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala new file mode 100644 index 00000000000..cc8570fea31 --- /dev/null +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import com.databricks.sql.execution.window.RunningWindowFunctionExec +import com.nvidia.spark.rapids._ +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.python.WindowInPandasExec +import org.apache.spark.sql.execution.window.WindowExecBase +import org.apache.spark.sql.rapids.GpuFileSourceScanExec +import org.apache.spark.sql.rapids.shims.GpuFileScanRDD +import org.apache.spark.sql.types._ + +object SparkShimImpl extends Spark321PlusShims { + + override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion + + override def isCastingStringToNegDecimalScaleSupported: Boolean = true + + override def getFileScanRDD( + sparkSession: SparkSession, + readFunction: PartitionedFile => Iterator[InternalRow], + filePartitions: Seq[FilePartition], + readDataSchema: StructType, + metadataColumns: Seq[AttributeReference]): RDD[InternalRow] = { + new GpuFileScanRDD(sparkSession, readFunction, filePartitions) + } + + override def broadcastModeTransform(mode: BroadcastMode, rows: Array[InternalRow]): Any = { + // In some cases we can be asked to transform when there's no task context, which appears to + // be new behavior since Databricks 10.4. A task memory manager must be passed, so if one is + // not available we construct one from the main memory manager using a task attempt ID of 0. + val memoryManager = Option(TaskContext.get).map(_.taskMemoryManager()).getOrElse { + new TaskMemoryManager(SparkEnv.get.memoryManager, 0) + } + mode.transform(rows, memoryManager) + } + + override def newBroadcastQueryStageExec( + old: BroadcastQueryStageExec, + newPlan: SparkPlan): BroadcastQueryStageExec = + BroadcastQueryStageExec(old.id, newPlan, old.originalPlan) + + override def filesFromFileIndex(fileCatalog: PartitioningAwareFileIndex): Seq[FileStatus] = { + fileCatalog.allFiles().map(_.toFileStatus) + } + + override def neverReplaceShowCurrentNamespaceCommand: ExecRule[_ <: SparkPlan] = null + + override def getWindowExpressions(winPy: WindowInPandasExec): Seq[NamedExpression] = + winPy.projectList + + override def isWindowFunctionExec(plan: SparkPlan): Boolean = + plan.isInstanceOf[WindowExecBase] || plan.isInstanceOf[RunningWindowFunctionExec] + + private val shimExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = { + Seq( + GpuOverrides.exec[FileSourceScanExec]( + "Reading data from files, often from Hive tables", + ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.STRUCT + TypeSig.MAP + + TypeSig.ARRAY + TypeSig.DECIMAL_128).nested(), TypeSig.all), + (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { + + // Replaces SubqueryBroadcastExec inside dynamic pruning filters with GPU counterpart + // if possible. Instead regarding filters as childExprs of current Meta, we create + // a new meta for SubqueryBroadcastExec. The reason is that the GPU replacement of + // FileSourceScan is independent from the replacement of the partitionFilters. It is + // possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters + // are on the GPU. And vice versa. + private lazy val partitionFilters = { + val convertBroadcast = (bc: SubqueryBroadcastExec) => { + val meta = GpuOverrides.wrapAndTagPlan(bc, conf) + meta.tagForExplain() + meta.convertIfNeeded().asInstanceOf[BaseSubqueryExec] + } + wrapped.partitionFilters.map { filter => + filter.transformDown { + case dpe @ DynamicPruningExpression(inSub: InSubqueryExec) => + inSub.plan match { + case bc: SubqueryBroadcastExec => + dpe.copy(inSub.copy(plan = convertBroadcast(bc))) + case reuse @ ReusedSubqueryExec(bc: SubqueryBroadcastExec) => + dpe.copy(inSub.copy(plan = reuse.copy(convertBroadcast(bc)))) + case _ => + dpe + } + } + } + } + + // partition filters and data filters are not run on the GPU + override val childExprs: Seq[ExprMeta[_]] = Seq.empty + + override def tagPlanForGpu(): Unit = { + // this is very specific check to have any of the Delta log metadata queries + // fallback and run on the CPU since there is some incompatibilities in + // Databricks Spark and Apache Spark. + if (wrapped.relation.fileFormat.isInstanceOf[JsonFileFormat] && + wrapped.relation.location.getClass.getCanonicalName() == + "com.databricks.sql.transaction.tahoe.DeltaLogFileIndex") { + this.entirePlanWillNotWork("Plans that read Delta Index JSON files can not run " + + "any part of the plan on the GPU!") + } + GpuFileSourceScanExec.tagSupport(this) + } + + override def convertToCpu(): SparkPlan = { + wrapped.copy(partitionFilters = partitionFilters) + } + + override def convertToGpu(): GpuExec = { + val sparkSession = wrapped.relation.sparkSession + val options = wrapped.relation.options + + val location = replaceWithAlluxioPathIfNeeded( + conf, + wrapped.relation, + partitionFilters, + wrapped.dataFilters) + + val newRelation = HadoopFsRelation( + location, + wrapped.relation.partitionSchema, + wrapped.relation.dataSchema, + wrapped.relation.bucketSpec, + GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), + options)(sparkSession) + + GpuFileSourceScanExec( + newRelation, + wrapped.output, + wrapped.requiredSchema, + partitionFilters, + wrapped.optionalBucketSet, + // TODO: Does Databricks have coalesced bucketing implemented? + None, + wrapped.dataFilters, + wrapped.tableIdentifier, + wrapped.disableBucketedScan)(conf) + } + }), + GpuOverrides.exec[RunningWindowFunctionExec]( + "Databricks-specific window function exec, for \"running\" windows, " + + "i.e. (UNBOUNDED PRECEDING TO CURRENT ROW)", + ExecChecks( + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), + TypeSig.all, + Map("partitionSpec" -> + InputCheck(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128, + TypeSig.all))), + (runningWindowFunctionExec, conf, p, r) => + new GpuRunningWindowExecMeta(runningWindowFunctionExec, conf, p, r) + ) + ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap + } + + override def getExecs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = + super.getExecs ++ shimExecs +} + +// Fallback to the default definition of `deterministic` +trait GpuDeterministicFirstLastCollectShim extends Expression diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/spark321db/SparkShimServiceProvider.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/spark321db/SparkShimServiceProvider.scala new file mode 100644 index 00000000000..357b6a03f08 --- /dev/null +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/spark321db/SparkShimServiceProvider.scala @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims.spark321db + +import com.nvidia.spark.rapids.{DatabricksShimVersion, ShimVersion} + +import org.apache.spark.SparkEnv + +object SparkShimServiceProvider { + val VERSION = DatabricksShimVersion(3, 2, 1) +} + +class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider { + + override def getShimVersion: ShimVersion = SparkShimServiceProvider.VERSION + + def matchesVersion(version: String): Boolean = { + SparkEnv.get.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "").startsWith("10.4.") + } +} diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/spark321db/RapidsShuffleManager.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/spark321db/RapidsShuffleManager.scala new file mode 100644 index 00000000000..82c231908f2 --- /dev/null +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/spark321db/RapidsShuffleManager.scala @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.spark321db + +import org.apache.spark.SparkConf +import org.apache.spark.sql.rapids.shims.spark321db.ProxyRapidsShuffleInternalManager + +/** A shuffle manager optimized for the RAPIDS Plugin for Apache Spark. */ +sealed class RapidsShuffleManager( + conf: SparkConf, + isDriver: Boolean) extends ProxyRapidsShuffleInternalManager(conf, isDriver) diff --git a/sql-plugin/src/main/321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala new file mode 100644 index 00000000000..44d61961b40 --- /dev/null +++ b/sql-plugin/src/main/321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.rapids.shims + +import scala.concurrent.Future + +import com.nvidia.spark.rapids.GpuPartitioning + +import org.apache.spark.MapOutputStatistics +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} +import org.apache.spark.sql.execution.exchange.{ShuffleExchangeLike, ShuffleOrigin} +import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, ShuffledBatchRDD} + +case class GpuShuffleExchangeExec( + gpuOutputPartitioning: GpuPartitioning, + child: SparkPlan, + shuffleOrigin: ShuffleOrigin)( + cpuOutputPartitioning: Partitioning) + extends GpuShuffleExchangeExecBase(gpuOutputPartitioning, child) with ShuffleExchangeLike { + + override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil + + override val outputPartitioning: Partitioning = cpuOutputPartitioning + + // 'mapOutputStatisticsFuture' is only needed when enable AQE. + override def mapOutputStatisticsFuture: Future[MapOutputStatistics] = + if (inputBatchRDD.getNumPartitions == 0) { + Future.successful(null) + } else { + sparkContext.submitMapStage(shuffleDependencyColumnar) + } + + override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions + + override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions + + override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[_] = { + new ShuffledBatchRDD(shuffleDependencyColumnar, metrics ++ readMetrics, partitionSpecs) + } + + // DB SPECIFIC - throw if called since we don't know how its used + override def withNewOutputPartitioning(outputPartitioning: Partitioning) = { + throw new UnsupportedOperationException + } + + override def runtimeStatistics: Statistics = { + // note that Spark will only use the sizeInBytes statistic but making the rowCount + // available here means that we can more easily reference it in GpuOverrides when + // planning future query stages when AQE is on + Statistics( + sizeInBytes = metrics("dataSize").value, + rowCount = Some(metrics("numOutputRows").value) + ) + } + + override def shuffleId: Int = shuffleDependencyColumnar.shuffleId +} diff --git a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala new file mode 100644 index 00000000000..2b7e5b2193a --- /dev/null +++ b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/csv/GpuCsvUtils.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +import org.apache.spark.sql.catalyst.util.DateFormatter + +object GpuCsvUtils { + def dateFormatInRead(options: CSVOptions): String = + options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) +} diff --git a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala new file mode 100644 index 00000000000..cd112da4e7a --- /dev/null +++ b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import org.apache.spark.sql.catalyst.util.DateFormatter + +object GpuJsonUtils { + def dateFormatInRead(options: JSONOptions): String = + options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) +} diff --git a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/Spark33XFileOptionsShims.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala similarity index 90% rename from sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/Spark33XFileOptionsShims.scala rename to sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala index 092f86a6504..e33d4b085fa 100644 --- a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/Spark33XFileOptionsShims.scala +++ b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala @@ -16,12 +16,10 @@ package org.apache.spark.sql.catalyst.json.rapids.shims -import com.nvidia.spark.rapids.shims.Spark321PlusShims - import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.json.JSONOptions -trait Spark33XFileOptionsShims extends Spark321PlusShims { +object FileOptionsShims { def timestampFormatInRead(fileOptions: Serializable): Option[String] = { fileOptions match { diff --git a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala new file mode 100644 index 00000000000..62fb71dcee9 --- /dev/null +++ b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.execution.python.shims + +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.python.PythonWorkerSemaphore +import com.nvidia.spark.rapids.shims.ShimUnaryExecNode +import com.nvidia.spark.rapids.shims.SparkShimImpl + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasExec +import org.apache.spark.sql.rapids.execution.python.{GpuArrowPythonRunner, GpuPythonExecBase, GpuPythonHelper, GpuPythonUDF, GroupArgs} +import org.apache.spark.sql.rapids.execution.python.BatchGroupUtils._ +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.util.ArrowUtils +import org.apache.spark.sql.vectorized.ColumnarBatch + +class GpuFlatMapGroupsInPandasExecMeta( + flatPandas: FlatMapGroupsInPandasExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends SparkPlanMeta[FlatMapGroupsInPandasExec](flatPandas, conf, parent, rule) { + + override def replaceMessage: String = "partially run on GPU" + override def noReplacementPossibleMessage(reasons: String): String = + s"cannot run even partially on the GPU because $reasons" + + private val groupingAttrs: Seq[BaseExprMeta[Attribute]] = + flatPandas.groupingAttributes.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + + private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( + flatPandas.func.asInstanceOf[PythonUDF], conf, Some(this)) + + private val resultAttrs: Seq[BaseExprMeta[Attribute]] = + flatPandas.output.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + + override val childExprs: Seq[BaseExprMeta[_]] = groupingAttrs ++ resultAttrs :+ udf + + override def convertToGpu(): GpuExec = + GpuFlatMapGroupsInPandasExec( + groupingAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], + udf.convertToGpu(), + resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], + childPlans.head.convertIfNeeded() + ) +} + +/** + * GPU version of Spark's `FlatMapGroupsInPandasExec` + * + * Rows in each group are passed to the Python worker as an Arrow record batch. + * The Python worker turns the record batch to a `pandas.DataFrame`, invoke the + * user-defined function, and passes the resulting `pandas.DataFrame` + * as an Arrow record batch. Finally, each record batch is turned to + * a ColumnarBatch. + * + * This node aims at accelerating the data transfer between JVM and Python for GPU pipeline, and + * scheduling GPU resources for its Python processes. + */ +case class GpuFlatMapGroupsInPandasExec( + groupingAttributes: Seq[Attribute], + func: Expression, + output: Seq[Attribute], + child: SparkPlan) + extends SparkPlan with ShimUnaryExecNode with GpuPythonExecBase { + + override def producedAttributes: AttributeSet = AttributeSet(output) + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def requiredChildDistribution: Seq[Distribution] = { + if (groupingAttributes.isEmpty) { + AllTuples :: Nil + } else { + ClusteredDistribution(groupingAttributes) :: Nil + } + } + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = + Seq(groupingAttributes.map(SparkShimImpl.sortOrder(_, Ascending))) + + private val pandasFunction = func.asInstanceOf[GpuPythonUDF].func + + // One batch as input to keep the integrity for each group + override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(RequireSingleBatch) + + // The input batch will be split into multiple batches by grouping expression, and + // processed by Python executors group by group, so better to coalesce the output batches. + override def coalesceAfter: Boolean = true + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val (mNumInputRows, mNumInputBatches, mNumOutputRows, mNumOutputBatches, + spillCallback) = commonGpuMetrics() + + lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf) + val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) + val sessionLocalTimeZone = conf.sessionLocalTimeZone + val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) + val localOutput = output + val localChildOutput = child.output + // Python wraps the resulting columns in a single struct column. + val pythonOutputSchema = StructType( + StructField("out_struct", StructType.fromAttributes(localOutput)) :: Nil) + + // Configs from DB 10.4 runtime + val maxBytes = conf.pandasZeroConfConversionMaxBytesPerSlice + val zeroConfEnabled = conf.pandasZeroConfConversionEnabled + + // Resolve the argument offsets and related attributes. + val GroupArgs(dedupAttrs, argOffsets, groupingOffsets) = + resolveArgOffsets(child, groupingAttributes) + + // Start processing. Map grouped batches to ArrowPythonRunner results. + child.executeColumnar().mapPartitionsInternal { inputIter => + if (isPythonOnGpuEnabled) { + GpuPythonHelper.injectGpuInfo(chainedFunc, isPythonOnGpuEnabled) + PythonWorkerSemaphore.acquireIfNecessary(TaskContext.get()) + } + + // Projects each input batch into the deduplicated schema, and splits + // into separate group batches to sends them to Python group by group later. + val pyInputIter = projectAndGroup(inputIter, localChildOutput, dedupAttrs, groupingOffsets, + mNumInputRows, mNumInputBatches, spillCallback) + + if (pyInputIter.hasNext) { + // Launch Python workers only when the data is not empty. + // Choose the right DB SPECIFIC serializer from 9.1 runtime. + val pyRunner = if (zeroConfEnabled && maxBytes > 0L) { + new GpuGroupUDFArrowPythonRunner( + chainedFunc, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + Array(argOffsets), + StructType.fromAttributes(dedupAttrs), + sessionLocalTimeZone, + pythonRunnerConf, + // The whole group data should be written in a single call, so here is unlimited + Int.MaxValue, + spillCallback.semaphoreWaitTime, + onDataWriteFinished = null, + pythonOutputSchema, + // We can not assert the result batch from Python has the same row number with the + // input batch. Because Grouped Map UDF allows the output of arbitrary length. + // So try to read as many as possible by specifying `minReadTargetBatchSize` as + // `Int.MaxValue` here. + Int.MaxValue) + } else { + new GpuArrowPythonRunner( + chainedFunc, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + Array(argOffsets), + StructType.fromAttributes(dedupAttrs), + sessionLocalTimeZone, + pythonRunnerConf, + Int.MaxValue, + spillCallback.semaphoreWaitTime, + onDataWriteFinished = null, + pythonOutputSchema, + Int.MaxValue) + } + + executePython(pyInputIter, localOutput, pyRunner, mNumOutputRows, mNumOutputBatches) + } else { + // Empty partition, return it directly + inputIter + } + } // end of mapPartitionsInternal + } +} diff --git a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/shims/spark321db/RapidsShuffleInternalManager.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/shims/spark321db/RapidsShuffleInternalManager.scala new file mode 100644 index 00000000000..a692d22a810 --- /dev/null +++ b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/shims/spark321db/RapidsShuffleInternalManager.scala @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.shims.spark321db + +import org.apache.spark.{SparkConf, TaskContext} +import org.apache.spark.shuffle._ +import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase} + +/** + * A shuffle manager optimized for the RAPIDS Plugin For Apache Spark. + * @note This is an internal class to obtain access to the private + * `ShuffleManager` and `SortShuffleManager` classes. + */ +class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean) + extends RapidsShuffleInternalManagerBase(conf, isDriver) { + + def getReader[K, C]( + handle: ShuffleHandle, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int, + context: TaskContext, + metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { + getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context, + metrics) + } +} + +class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean) + extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager { + + def getReader[K, C]( + handle: ShuffleHandle, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int, + context: TaskContext, + metrics: ShuffleReadMetricsReporter + ): ShuffleReader[K, C] = { + self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context, + metrics) + } +} diff --git a/sql-plugin/src/main/322/scala/com/nvidia/spark/rapids/shims/SparkShims.scala b/sql-plugin/src/main/322/scala/com/nvidia/spark/rapids/shims/SparkShims.scala index 255b58f1921..f51e36c78ec 100644 --- a/sql-plugin/src/main/322/scala/com/nvidia/spark/rapids/shims/SparkShims.scala +++ b/sql-plugin/src/main/322/scala/com/nvidia/spark/rapids/shims/SparkShims.scala @@ -25,7 +25,9 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} import org.apache.spark.sql.types.StructType -object SparkShimImpl extends Spark321PlusShims with Spark30Xuntil33XShims { +object SparkShimImpl extends Spark321PlusShims + with Spark320PlusNonDBShims + with Spark30Xuntil33XShims { override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion override def getFileScanRDD( diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala index a938d84a75a..7bae6214eaf 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/Spark33XShims.scala @@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.json.rapids.shims.Spark33XFileOptionsShims import org.apache.spark.sql.execution.{BaseSubqueryExec, CoalesceExec, FileSourceScanExec, FilterExec, InSubqueryExec, ProjectExec, ReusedSubqueryExec, SparkPlan, SubqueryBroadcastExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -39,7 +38,7 @@ import org.apache.spark.sql.rapids.shims.GpuTimeAdd import org.apache.spark.sql.types.{CalendarIntervalType, DayTimeIntervalType, StructType} import org.apache.spark.unsafe.types.CalendarInterval -trait Spark33XShims extends Spark33XFileOptionsShims { +trait Spark33XShims extends Spark321PlusShims with Spark320PlusNonDBShims { /** * For spark3.3+ optionally return null if element not exists. diff --git a/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala new file mode 100644 index 00000000000..e33d4b085fa --- /dev/null +++ b/sql-plugin/src/main/330+/scala/org/apache/spark/sql/catalyst/json/rapids/shims/FileOptionsShims.scala @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json.rapids.shims + +import org.apache.spark.sql.catalyst.csv.CSVOptions +import org.apache.spark.sql.catalyst.json.JSONOptions + +object FileOptionsShims { + + def timestampFormatInRead(fileOptions: Serializable): Option[String] = { + fileOptions match { + case csvOpts: CSVOptions => csvOpts.dateFormatInRead + case jsonOpts: JSONOptions => jsonOpts.dateFormatInRead + case _ => throw new RuntimeException("Wrong file options.") + } + } + +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index 3f83ec05cff..15ab0b4987d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConverters._ import ai.rapids.cudf import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table} -import com.nvidia.spark.rapids.shims.SparkShimImpl import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -32,6 +31,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.csv.{CSVOptions, GpuCsvUtils} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.json.rapids.shims.FileOptionsShims import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.util.PermissiveMode import org.apache.spark.sql.connector.read._ @@ -231,7 +231,7 @@ object GpuCSVScan { if (!TypeChecks.areTimestampsSupported(parsedOptions.zoneId)) { meta.willNotWorkOnGpu("Only UTC zone id is supported") } - SparkShimImpl.timestampFormatInRead(parsedOptions).foreach { tsFormat => + FileOptionsShims.timestampFormatInRead(parsedOptions).foreach { tsFormat => val parts = tsFormat.split("'T'", 2) if (parts.isEmpty) { meta.willNotWorkOnGpu(s"the timestamp format '$tsFormat' is not supported") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index d3ca0766c26..d7464df35b2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -304,8 +304,6 @@ trait SparkShims { */ def getLegacyStatisticalAggregate(): Boolean - def timestampFormatInRead(fileOptions: Serializable): Option[String] - def neverReplaceShowCurrentNamespaceCommand: ExecRule[_ <: SparkPlan] /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 00641748777..382a3a4a4a5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -25,7 +25,7 @@ import ai.rapids.cudf import ai.rapids.cudf.NvtxColor import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.shims.{ShimUnaryExecNode, SparkShimImpl} +import com.nvidia.spark.rapids.shims.{AggregationTagging, ShimUnaryExecNode, SparkShimImpl} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -802,6 +802,26 @@ class GpuHashAggregateIterator( } } +object GpuBaseAggregateMeta { + private val aggPairReplaceChecked = TreeNodeTag[Boolean]( + "rapids.gpu.aggPairReplaceChecked") + + def getAggregateOfAllStages( + currentMeta: SparkPlanMeta[_], logical: LogicalPlan): List[GpuBaseAggregateMeta[_]] = { + currentMeta match { + case aggMeta: GpuBaseAggregateMeta[_] if aggMeta.agg.logicalLink.contains(logical) => + List[GpuBaseAggregateMeta[_]](aggMeta) ++ + getAggregateOfAllStages(aggMeta.childPlans.head, logical) + case shuffleMeta: GpuShuffleMeta => + getAggregateOfAllStages(shuffleMeta.childPlans.head, logical) + case sortMeta: GpuSortMeta => + getAggregateOfAllStages(sortMeta.childPlans.head, logical) + case _ => + List[GpuBaseAggregateMeta[_]]() + } + } +} + abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( plan: INPUT, aggRequiredChildDistributionExpressions: Option[Seq[Expression]], @@ -843,6 +863,10 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( willNotWorkOnGpu( "DISTINCT and FILTER cannot be used in aggregate functions at the same time") } + + if (AggregationTagging.mustReplaceBoth) { + tagForMixedReplacement() + } } /** @@ -908,6 +932,34 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( } } + /** Prevent mixing of CPU and GPU aggregations */ + private def tagForMixedReplacement(): Unit = { + // only run the check for final stages that have not already been checked + val haveChecked = + agg.getTagValue[Boolean](GpuBaseAggregateMeta.aggPairReplaceChecked).contains(true) + val needCheck = !haveChecked && agg.aggregateExpressions.exists { + case e: AggregateExpression if e.mode == Final => true + case _ => false + } + + if (needCheck) { + agg.setTagValue(GpuBaseAggregateMeta.aggPairReplaceChecked, true) + val stages = GpuBaseAggregateMeta.getAggregateOfAllStages(this, agg.logicalLink.get) + // check if aggregations will mix CPU and GPU across stages + val hasMixedAggs = stages.indices.exists { + case i if i == stages.length - 1 => false + case i => stages(i).canThisBeReplaced ^ stages(i + 1).canThisBeReplaced + } + if (hasMixedAggs) { + stages.foreach { + case aggMeta if aggMeta.canThisBeReplaced => + aggMeta.willNotWorkOnGpu("mixing CPU and GPU aggregations is not supported") + case _ => + } + } + } + } + override def convertToGpu(): GpuExec = { GpuHashAggregateExec( aggRequiredChildDistributionExpressions, @@ -1091,7 +1143,7 @@ object GpuTypedImperativeSupportedAggregateExecMeta { meta.agg.setTagValue(bufferConverterInjected, true) // Fetch AggregateMetas of all stages which belong to current Aggregate - val stages = getAggregateOfAllStages(meta, meta.agg.logicalLink.get) + val stages = GpuBaseAggregateMeta.getAggregateOfAllStages(meta, meta.agg.logicalLink.get) // Find out stages in which the buffer converters are essential. val needBufferConversion = stages.indices.map { @@ -1222,21 +1274,6 @@ object GpuTypedImperativeSupportedAggregateExecMeta { expressions } - private def getAggregateOfAllStages( - currentMeta: SparkPlanMeta[_], logical: LogicalPlan): List[GpuBaseAggregateMeta[_]] = { - currentMeta match { - case aggMeta: GpuBaseAggregateMeta[_] if aggMeta.agg.logicalLink.contains(logical) => - List[GpuBaseAggregateMeta[_]](aggMeta) ++ - getAggregateOfAllStages(aggMeta.childPlans.head, logical) - case shuffleMeta: GpuShuffleMeta => - getAggregateOfAllStages(shuffleMeta.childPlans.head, logical) - case sortMeta: GpuSortMeta => - getAggregateOfAllStages(sortMeta.childPlans.head, logical) - case _ => - List[GpuBaseAggregateMeta[_]]() - } - } - @tailrec private def nextEdgeForConversion(meta: SparkPlanMeta[_]): Seq[SparkPlanMeta[_]] = { val child = meta.childPlans.head diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala index e9a6cb490ab..447862225a6 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala @@ -24,7 +24,6 @@ import scala.collection.mutable.ListBuffer import ai.rapids.cudf import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table} import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.shims.SparkShimImpl import org.apache.hadoop.conf.Configuration import org.apache.spark.broadcast.Broadcast @@ -32,6 +31,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.json.{GpuJsonUtils, JSONOptions, JSONOptionsInRead} +import org.apache.spark.sql.catalyst.json.rapids.shims.FileOptionsShims import org.apache.spark.sql.catalyst.util.PermissiveMode import org.apache.spark.sql.connector.read.{PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.QueryExecutionException @@ -156,7 +156,7 @@ object GpuJsonScan { if (!TypeChecks.areTimestampsSupported(parsedOptions.zoneId)) { meta.willNotWorkOnGpu("Only UTC zone id is supported") } - SparkShimImpl.timestampFormatInRead(parsedOptions).foreach { tsFormat => + FileOptionsShims.timestampFormatInRead(parsedOptions).foreach { tsFormat => val parts = tsFormat.split("'T'", 2) if (parts.isEmpty) { meta.willNotWorkOnGpu(s"the timestamp format '$tsFormat' is not supported") diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala index 02151f3acd0..49284f58f38 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import org.apache.spark.TaskContext import org.apache.spark.executor.TaskMetrics import org.apache.spark.metrics.source.Source import org.apache.spark.resource.ResourceInformation +import org.apache.spark.scheduler.TaskLocality import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, TaskFailureListener} @@ -78,5 +79,8 @@ class MockTaskContext(taskAttemptId: Long, partitionId: Int) extends TaskContext override private[spark] def getLocalProperties = new Properties() def cpus(): Int = 2 -} + def numPartitions(): Int = 2 + + def taskLocality(): TaskLocality.TaskLocality = TaskLocality.ANY +} diff --git a/tests/pom.xml b/tests/pom.xml index 83d4c44df19..466578d412c 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -190,14 +190,8 @@ org.apache.hadoop - hadoop-common - ${spark.version} - provided - - - org.apache.hadoop - hadoop-mapreduce-client - ${spark.version} + hadoop-client + ${hadoop.client.version} provided