diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala index ba8a0530116..df673cbe0db 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala @@ -20,7 +20,7 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.DeltaOperations.StreamingUpdate -import org.apache.spark.sql.delta.actions.{FileAction, Metadata, SetTransaction} +import org.apache.spark.sql.delta.actions.{FileAction, Metadata, Protocol, SetTransaction} import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaMergingUtils, SchemaUtils} @@ -29,9 +29,8 @@ import org.apache.hadoop.fs.Path // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.internal.MDC import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.TableOutputResolver import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Expression} +import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} @@ -130,7 +129,7 @@ case class DeltaSink( txn.readWholeTable() } - val writeSchema = getWriteSchema(txn.metadata, data.schema) + val writeSchema = getWriteSchema(txn.protocol, txn.metadata, data.schema) // Streaming sinks can't blindly overwrite schema. See Schema Management design doc for details updateMetadata(data.sparkSession, txn, writeSchema, partitionColumns, Map.empty, outputMode == OutputMode.Complete(), rearrangeOnly = false) @@ -168,17 +167,20 @@ case class DeltaSink( /** * Returns the schema to use to write data to this delta table. The write schema includes new - * columns to add with schema evolution and reconciles types to match the table types. + * columns to add with schema evolution and reconciles types to match the table types when + * possible or apply type widening if enabled. */ - private def getWriteSchema(metadata: Metadata, dataSchema: StructType): StructType = { + private def getWriteSchema( + protocol: Protocol, metadata: Metadata, dataSchema: StructType): StructType = { if (!sqlConf.getConf(DeltaSQLConf.DELTA_STREAMING_SINK_ALLOW_IMPLICIT_CASTS)) return dataSchema if (canOverwriteSchema) return dataSchema SchemaMergingUtils.mergeSchemas( - tableSchema = metadata.schema, - dataSchema = dataSchema, - allowImplicitConversions = true + metadata.schema, + dataSchema, + allowImplicitConversions = true, + allowTypeWidening = canMergeSchema && TypeWidening.isEnabled(protocol, metadata) ) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkImplicitCastSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkImplicitCastSuite.scala index adc8c0e540c..27966ef23b2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkImplicitCastSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkImplicitCastSuite.scala @@ -28,13 +28,14 @@ import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution} import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy -import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, Trigger} import org.apache.spark.sql.types._ /** - * Covers handling implicit casting to handle type mismatches when writing data to a Delta sink. + * Defines helper class & methods to test writing to a Delta streaming sink using data types that + * don't match the corresponding column type in the table schema. */ -abstract class DeltaSinkImplicitCastTest extends DeltaSinkTest { +abstract class DeltaSinkImplicitCastSuiteBase extends DeltaSinkTest { override def beforeAll(): Unit = { super.beforeAll() @@ -59,6 +60,7 @@ abstract class DeltaSinkImplicitCastTest extends DeltaSinkTest { extraOptions: Map[String, String])( data: T*)( selectExpr: String*): Unit = { + source.addData(data) val query = source.toDF() .selectExpr(selectExpr: _*) @@ -67,9 +69,9 @@ abstract class DeltaSinkImplicitCastTest extends DeltaSinkTest { .outputMode(outputMode) .options(extraOptions) .format("delta") + .trigger(Trigger.AvailableNow()) .start(outputDir.getCanonicalPath) try { - source.addData(data) failAfter(streamingTimeout) { query.processAllAvailable() } @@ -106,7 +108,10 @@ abstract class DeltaSinkImplicitCastTest extends DeltaSinkTest { } } -class DeltaSinkImplicitCastSuite extends DeltaSinkImplicitCastTest { +/** + * Covers handling implicit casting to handle type mismatches when writing data to a Delta sink. + */ +class DeltaSinkImplicitCastSuite extends DeltaSinkImplicitCastSuiteBase { import testImplicits._ test(s"write wider type - long -> int") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSinkSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSinkSuite.scala new file mode 100644 index 00000000000..de5fd6c36ec --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSinkSuite.scala @@ -0,0 +1,229 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.typewidening + +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.sources.{DeltaSink, DeltaSQLConf} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types._ + +/** + * Suite covering automatic type widening in the Delta streaming sink. + */ +class TypeWideningStreamingSinkSuite + extends DeltaSinkImplicitCastSuiteBase + with TypeWideningTestMixin { + + import testImplicits._ + + override def beforeAll(): Unit = { + super.beforeAll() + // Set by default confs to enable automatic type widening in all tests. Negative tests should + // explicitly disable these. + spark.conf.set(DeltaSQLConf.DELTA_STREAMING_SINK_ALLOW_IMPLICIT_CASTS.key, "true") + spark.conf.set(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey, "true") + spark.conf.set(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true") + // Ensure we don't silently cast test inputs to null on overflow. + spark.conf.set(SQLConf.ANSI_ENABLED.key, "true") + } + + test("type isn't widened if schema evolution is disabled") { + withDeltaStream[Int] { stream => + stream.write(17)("CAST(value AS SHORT)") + assert(stream.currentSchema("value").dataType === ShortType) + checkAnswer(stream.read(), Row(17)) + + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "false") { + stream.write(53)("CAST(value AS INT)") + assert(stream.currentSchema("value").dataType === ShortType) + checkAnswer(stream.read(), Row(17) :: Row(53) :: Nil) + } + } + } + + test("type isn't widened if type widening is disabled") { + withDeltaStream[Int] { stream => + withSQLConf(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey -> "false") { + stream.write(17)("CAST(value AS SHORT)") + assert(stream.currentSchema("value").dataType === ShortType) + checkAnswer(stream.read(), Row(17)) + + stream.write(53)("CAST(value AS INT)") + assert(stream.currentSchema("value").dataType === ShortType) + checkAnswer(stream.read(), Row(17) :: Row(53) :: Nil) + } + } + } + + test("type is widened if type widening and schema evolution are enabled") { + withDeltaStream[Int] { stream => + stream.write(17)("CAST(value AS SHORT)") + assert(stream.currentSchema("value").dataType === ShortType) + checkAnswer(stream.read(), Row(17)) + + stream.write(Int.MaxValue)("CAST(value AS INT)") + assert(stream.currentSchema("value").dataType === IntegerType) + checkAnswer(stream.read(), Row(17) :: Row(Int.MaxValue) :: Nil) + } + } + + test("type can be widened even if type casting is disabled in the sink") { + withDeltaStream[Int] { stream => + stream.write(17)("CAST(value AS SHORT)") + assert(stream.currentSchema("value").dataType === ShortType) + checkAnswer(stream.read(), Row(17)) + + withSQLConf(DeltaSQLConf.DELTA_STREAMING_SINK_ALLOW_IMPLICIT_CASTS.key -> "false") { + stream.write(Int.MaxValue)("CAST(value AS INT)") + assert(stream.currentSchema("value").dataType === IntegerType) + checkAnswer(stream.read(), Row(17) :: Row(Int.MaxValue) :: Nil) + } + } + } + + test("type isn't changed if it's not a wider type") { + withDeltaStream[Int] { stream => + stream.write(Int.MaxValue)("CAST(value AS INT)") + assert(stream.currentSchema("value").dataType === IntegerType) + checkAnswer(stream.read(), Row(Int.MaxValue)) + + stream.write(17)("CAST(value AS SHORT)") + assert(stream.currentSchema("value").dataType === IntegerType) + checkAnswer(stream.read(), Row(Int.MaxValue) :: Row(17) :: Nil) + } + } + + test("type isn't changed if it's not eligible for automatic widening: int -> decimal") { + withDeltaStream[Int] { stream => + stream.write(17)("CAST(value AS INT)") + assert(stream.currentSchema("value").dataType === IntegerType) + checkAnswer(stream.read(), Row(17)) + + stream.write(567)("CAST(value AS DECIMAL(20, 0))") + assert(stream.currentSchema("value").dataType === IntegerType) + checkAnswer(stream.read(), Row(17) :: Row(567) :: Nil) + } + } + + test("type isn't changed if it's not eligible for automatic widening: int -> double") { + withDeltaStream[Int] { stream => + stream.write(17)("CAST(value AS INT)") + assert(stream.currentSchema("value").dataType === IntegerType) + checkAnswer(stream.read(), Row(17)) + + stream.write(567)("CAST(value AS DOUBLE)") + assert(stream.currentSchema("value").dataType === IntegerType) + checkAnswer(stream.read(), Row(17) :: Row(567) :: Nil) + } + } + + test("widen type and add a new column with schema evolution") { + withDeltaStream[(Int, Int)] { stream => + stream.write((17, -1))("CAST(_1 AS SHORT) AS a") + assert(stream.currentSchema === new StructType().add("a", ShortType)) + checkAnswer(stream.read(), Row(17)) + + stream.write((12, 3456))("CAST(_1 AS INT) AS a", "CAST(_2 AS DECIMAL(10, 2)) AS b") + assert(stream.currentSchema === new StructType() + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) + .add("b", DecimalType(10, 2))) + checkAnswer(stream.read(), Row(17, null) :: Row(12, 3456) :: Nil) + } + } + + test("widen type during write with missing column") { + withDeltaStream[(Int, Int)] { stream => + stream.write((17, 45))("CAST(_1 AS SHORT) AS a", "CAST(_2 AS LONG) AS b") + assert(stream.currentSchema === new StructType() + .add("a", ShortType) + .add("b", LongType)) + checkAnswer(stream.read(), Row(17, 45)) + + stream.write((12, -1))("CAST(_1 AS INT) AS a") + assert(stream.currentSchema === new StructType() + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) + .add("b", LongType)) + checkAnswer(stream.read(), Row(17, 45) :: Row(12, null) :: Nil) + } + } + + test("widen type after column rename and drop") { + withDeltaStream[(Int, Int)] { stream => + stream.write((17, 45))("CAST(_1 AS SHORT) AS a", "CAST(_2 AS DECIMAL(10, 0)) AS b") + assert(stream.currentSchema === new StructType() + .add("a", ShortType) + .add("b", DecimalType(10, 0))) + checkAnswer(stream.read(), Row(17, 45)) + + sql( + s""" + |ALTER TABLE delta.`${stream.deltaLog.dataPath}` SET TBLPROPERTIES ( + | 'delta.columnMapping.mode' = 'name', + | 'delta.minReaderVersion' = '2', + | 'delta.minWriterVersion' = '5' + |) + """.stripMargin) + sql(s"ALTER TABLE delta.`${stream.deltaLog.dataPath}` DROP COLUMN b") + sql(s"ALTER TABLE delta.`${stream.deltaLog.dataPath}` RENAME COLUMN a to c") + assert(stream.currentSchema === new StructType().add("c", ShortType)) + + stream.write((12, -1))("CAST(_1 AS INT) AS c") + assert(stream.currentSchema === new StructType().add("c", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 4, from = ShortType, to = IntegerType))) + checkAnswer(stream.read(), Row(17) :: Row(12) :: Nil) + } + } + + test("type widening in addBatch") { + withTempDir { tempDir => + val tablePath = tempDir.getAbsolutePath + val deltaLog = DeltaLog.forTable(spark, tablePath) + sqlContext.sparkContext.setLocalProperty(StreamExecution.QUERY_ID_KEY, "streaming_query") + val sink = DeltaSink( + sqlContext, + path = deltaLog.dataPath, + partitionColumns = Seq.empty, + outputMode = OutputMode.Append(), + options = new DeltaOptions(options = Map.empty, conf = spark.sessionState.conf) + ) + + val schema = new StructType().add("value", ShortType) + + { + val data = Seq(0, 1).toDF("value").selectExpr("CAST(value AS SHORT)") + sink.addBatch(0, data) + val df = spark.read.format("delta").load(tablePath) + assert(df.schema === schema) + checkAnswer(df, Row(0) :: Row(1) :: Nil) + } + { + val data = Seq(2, 3).toDF("value").selectExpr("CAST(value AS INT)") + sink.addBatch(1, data) + val df = spark.read.format("delta").load(tablePath) + assert(df.schema === new StructType().add("value", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))) + checkAnswer(df, Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil) + } + } + } +}