Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Update GpuFileFormatWriter to stay in sync with recent Spark changes, but still not support writing Hive bucketed table on GPU. #4484

51 changes: 51 additions & 0 deletions integration_tests/src/main/python/datasourcev2_write_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest

from asserts import assert_gpu_fallback_write
from data_gen import *
from marks import *
from pyspark.sql.types import *

@ignore_order
@allow_non_gpu('DataWritingCommandExec')
@pytest.mark.parametrize('fileFormat', ['parquet', 'orc'])
def test_write_hive_bucketed_table_fallback(spark_tmp_path, spark_tmp_table_factory, fileFormat):
"""
fallback because GPU does not support Hive hash partition
"""
src = spark_tmp_table_factory.get()
table = spark_tmp_table_factory.get()

def write_hive_table(spark):

data = map(lambda i: (i % 13, str(i), i % 5), range(50))
df = spark.createDataFrame(data, ["i", "j", "k"])
df.write.mode("overwrite").partitionBy("k").bucketBy(8, "i", "j").format(fileFormat).saveAsTable(src)

spark.sql("""
create table if not exists {0}
using hive options(fileFormat \"{1}\")
as select * from {2}
""".format(table, fileFormat, src))

data_path = spark_tmp_path + '/HIVE_DATA'

assert_gpu_fallback_write(
lambda spark, _: write_hive_table(spark),
lambda spark, _: spark.sql("SELECT * FROM {}".format(table)),
data_path,
'DataWritingCommandExec',
conf = {"hive.exec.dynamic.partition": "true",
"hive.exec.dynamic.partition.mode": "nonstrict"})
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -178,7 +178,7 @@ class GpuDynamicPartitionDataWriter(
private val isPartitioned = description.partitionColumns.nonEmpty

/** Flag saying whether or not the data to be written out is bucketed. */
private val isBucketed = description.bucketIdExpression.isDefined
private val isBucketed = description.bucketSpec.isDefined

if (isBucketed) {
throw new UnsupportedOperationException("Bucketing is not supported on the GPU yet.")
Expand Down Expand Up @@ -395,6 +395,16 @@ class GpuDynamicPartitionDataWriter(
}
}

/**
* Bucketing specification for all the write tasks.
* This is the GPU version of `org.apache.spark.sql.execution.datasources.WriterBucketSpec`
* @param bucketIdExpression Expression to calculate bucket id based on bucket column(s).
* @param bucketFileNamePrefix Prefix of output file name based on bucket id.
*/
case class GpuWriterBucketSpec(
bucketIdExpression: Expression,
bucketFileNamePrefix: Int => String)

/**
* A shared job description for all the GPU write tasks.
* This is the GPU version of `org.apache.spark.sql.execution.datasources.WriteJobDescription`.
Expand All @@ -406,7 +416,7 @@ class GpuWriteJobDescription(
val allColumns: Seq[Attribute],
val dataColumns: Seq[Attribute],
val partitionColumns: Seq[Attribute],
val bucketIdExpression: Option[Expression],
val bucketSpec: Option[GpuWriterBucketSpec],
val path: String,
val customPartitionLocations: Map[TablePartitionSpec, String],
val maxRecordsPerFile: Long,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,19 +128,13 @@ object GpuFileFormatWriter extends Logging {

val empty2NullPlan = if (needConvert) GpuProjectExec(projectList, plan) else plan

val bucketIdExpression = bucketSpec.map { _ =>
// Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
// guarantee the data distribution is same between shuffle and bucketed data source, which
// enables us to only shuffle one side when join a bucketed table and a normal one.
//HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
//
// TODO: Cannot support this until we either:
// Guarantee join and bucketing are both on the GPU and disable GPU-writing if join not on GPU
// OR
// Guarantee GPU hash partitioning is 100% compatible with CPU hashing
val writerBucketSpec: Option[GpuWriterBucketSpec] = bucketSpec.map { spec =>
// TODO: Cannot support this until we:
// support Hive hash partitioning on the GPU
throw new UnsupportedOperationException("GPU hash partitioning for bucketed data is not "
+ "compatible with the CPU version")
}

val sortColumns = bucketSpec.toSeq.flatMap {
spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get)
}
Expand All @@ -161,7 +155,7 @@ object GpuFileFormatWriter extends Logging {
allColumns = outputSpec.outputColumns,
dataColumns = dataColumns,
partitionColumns = partitionColumns,
bucketIdExpression = bucketIdExpression,
bucketSpec = writerBucketSpec,
path = outputSpec.outputPath,
customPartitionLocations = outputSpec.customPartitionLocations,
maxRecordsPerFile = caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong)
Expand All @@ -172,7 +166,8 @@ object GpuFileFormatWriter extends Logging {
)

// We should first sort by partition columns, then bucket id, and finally sorting columns.
val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
val requiredOrdering =
partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
// the sort order doesn't matter
val actualOrdering = empty2NullPlan.outputOrdering.map(_.child)
val orderingMatched = if (requiredOrdering.length > actualOrdering.length) {
Expand Down Expand Up @@ -298,7 +293,7 @@ object GpuFileFormatWriter extends Logging {
if (sparkPartitionId != 0 && !iterator.hasNext) {
// In case of empty job, leave first partition to save meta for file format like parquet.
new GpuEmptyDirectoryDataWriter(description, taskAttemptContext, committer)
} else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
new GpuSingleDirectoryDataWriter(description, taskAttemptContext, committer)
} else {
new GpuDynamicPartitionDataWriter(description, taskAttemptContext, committer)
Expand Down