diff --git a/integration_tests/src/main/python/datasourcev2_write_test.py b/integration_tests/src/main/python/datasourcev2_write_test.py new file mode 100644 index 00000000000..ac9b7bfeb30 --- /dev/null +++ b/integration_tests/src/main/python/datasourcev2_write_test.py @@ -0,0 +1,51 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from asserts import assert_gpu_fallback_write +from data_gen import * +from marks import * +from pyspark.sql.types import * + +@ignore_order +@allow_non_gpu('DataWritingCommandExec') +@pytest.mark.parametrize('fileFormat', ['parquet', 'orc']) +def test_write_hive_bucketed_table_fallback(spark_tmp_path, spark_tmp_table_factory, fileFormat): + """ + fallback because GPU does not support Hive hash partition + """ + src = spark_tmp_table_factory.get() + table = spark_tmp_table_factory.get() + + def write_hive_table(spark): + + data = map(lambda i: (i % 13, str(i), i % 5), range(50)) + df = spark.createDataFrame(data, ["i", "j", "k"]) + df.write.mode("overwrite").partitionBy("k").bucketBy(8, "i", "j").format(fileFormat).saveAsTable(src) + + spark.sql(""" + create table if not exists {0} + using hive options(fileFormat \"{1}\") + as select * from {2} + """.format(table, fileFormat, src)) + + data_path = spark_tmp_path + '/HIVE_DATA' + + assert_gpu_fallback_write( + lambda spark, _: write_hive_table(spark), + lambda spark, _: spark.sql("SELECT * FROM {}".format(table)), + data_path, + 'DataWritingCommandExec', + conf = {"hive.exec.dynamic.partition": "true", + "hive.exec.dynamic.partition.mode": "nonstrict"}) \ No newline at end of file diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index 51435e65551..bba53765735 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -178,7 +178,7 @@ class GpuDynamicPartitionDataWriter( private val isPartitioned = description.partitionColumns.nonEmpty /** Flag saying whether or not the data to be written out is bucketed. */ - private val isBucketed = description.bucketIdExpression.isDefined + private val isBucketed = description.bucketSpec.isDefined if (isBucketed) { throw new UnsupportedOperationException("Bucketing is not supported on the GPU yet.") @@ -395,6 +395,16 @@ class GpuDynamicPartitionDataWriter( } } +/** + * Bucketing specification for all the write tasks. + * This is the GPU version of `org.apache.spark.sql.execution.datasources.WriterBucketSpec` + * @param bucketIdExpression Expression to calculate bucket id based on bucket column(s). + * @param bucketFileNamePrefix Prefix of output file name based on bucket id. + */ +case class GpuWriterBucketSpec( + bucketIdExpression: Expression, + bucketFileNamePrefix: Int => String) + /** * A shared job description for all the GPU write tasks. * This is the GPU version of `org.apache.spark.sql.execution.datasources.WriteJobDescription`. @@ -406,7 +416,7 @@ class GpuWriteJobDescription( val allColumns: Seq[Attribute], val dataColumns: Seq[Attribute], val partitionColumns: Seq[Attribute], - val bucketIdExpression: Option[Expression], + val bucketSpec: Option[GpuWriterBucketSpec], val path: String, val customPartitionLocations: Map[TablePartitionSpec, String], val maxRecordsPerFile: Long, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index bd868496995..cf6edd7ea20 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -128,19 +128,13 @@ object GpuFileFormatWriter extends Logging { val empty2NullPlan = if (needConvert) GpuProjectExec(projectList, plan) else plan - val bucketIdExpression = bucketSpec.map { _ => - // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can - // guarantee the data distribution is same between shuffle and bucketed data source, which - // enables us to only shuffle one side when join a bucketed table and a normal one. - //HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression - // - // TODO: Cannot support this until we either: - // Guarantee join and bucketing are both on the GPU and disable GPU-writing if join not on GPU - // OR - // Guarantee GPU hash partitioning is 100% compatible with CPU hashing + val writerBucketSpec: Option[GpuWriterBucketSpec] = bucketSpec.map { spec => + // TODO: Cannot support this until we: + // support Hive hash partitioning on the GPU throw new UnsupportedOperationException("GPU hash partitioning for bucketed data is not " + "compatible with the CPU version") } + val sortColumns = bucketSpec.toSeq.flatMap { spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) } @@ -161,7 +155,7 @@ object GpuFileFormatWriter extends Logging { allColumns = outputSpec.outputColumns, dataColumns = dataColumns, partitionColumns = partitionColumns, - bucketIdExpression = bucketIdExpression, + bucketSpec = writerBucketSpec, path = outputSpec.outputPath, customPartitionLocations = outputSpec.customPartitionLocations, maxRecordsPerFile = caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong) @@ -172,7 +166,8 @@ object GpuFileFormatWriter extends Logging { ) // We should first sort by partition columns, then bucket id, and finally sorting columns. - val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns + val requiredOrdering = + partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns // the sort order doesn't matter val actualOrdering = empty2NullPlan.outputOrdering.map(_.child) val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { @@ -298,7 +293,7 @@ object GpuFileFormatWriter extends Logging { if (sparkPartitionId != 0 && !iterator.hasNext) { // In case of empty job, leave first partition to save meta for file format like parquet. new GpuEmptyDirectoryDataWriter(description, taskAttemptContext, committer) - } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) { + } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { new GpuSingleDirectoryDataWriter(description, taskAttemptContext, committer) } else { new GpuDynamicPartitionDataWriter(description, taskAttemptContext, committer)