Skip to content

Commit

Permalink
[Spark] Automatic type widening in Delta streaming sink (#3626)
Browse files Browse the repository at this point in the history
## Description
This change introduces automatic type widening during schema evolution
in the Delta streaming sink.
Conditions for type widening to trigger:
- Type widening is enabled on the Delta table
- Schema evolution (`mergeSchema`) is enabled on the sink
- The data written to the sink uses a type that is strictly wider than
the current type in the table schema, and moving from the narrower to
the wider type is eligible for type widening - see
`TypeWidening.isTypeChangeSupportedForSchemaEvolution`

When all conditions are satisfied, the table schema is updated to use
the wider type before ingesting the data.

## How was this patch tested?
Added test suite `TypeWideningStreamingSinkSuite` covering type widening
in the Delta streaming sink

## Does this PR introduce _any_ user-facing changes?
This builds on the user-facing change introduced in
#3443 that allows writing to a
delta sink using a different type than the current table type.
Without type widening:
```
spark.readStream
    .table("delta_source")
    # Column 'a' has type INT in 'delta_sink'.
    .select(col("a").cast("long").alias("a"))
    .writeStream
    .format("delta")
    .option("checkpointLocation", "<location>")
    .toTable("delta_sink")
```
The write to the sink succeeds, column `a` retains its type `INT` and
the data is cast from `LONG` to `INT` on write.

With type widening:
```
spark.sql("ALTER TABLE delta_sink SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.readStream
    .table("delta_source")
    # Column 'a' has type INT in 'delta_sink'.
    .select(col("a").cast("long").alias("a"))
    .writeStream
    .format("delta")
    .option("checkpointLocation", "<location>")
    .option("mergeSchema", "true")
    .toTable("delta_sink")
```
The write to sink succeeds, the type of column `a` is changed from `INT`
to `LONG`, data is ingested as `LONG`.
  • Loading branch information
johanl-db authored Sep 6, 2024
1 parent 9bd2c43 commit d59e3f0
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaOperations.StreamingUpdate
import org.apache.spark.sql.delta.actions.{FileAction, Metadata, SetTransaction}
import org.apache.spark.sql.delta.actions.{FileAction, Metadata, Protocol, SetTransaction}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaMergingUtils, SchemaUtils}
Expand All @@ -29,9 +29,8 @@ import org.apache.hadoop.fs.Path
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.internal.MDC
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.TableOutputResolver
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Expression}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
Expand Down Expand Up @@ -130,7 +129,7 @@ case class DeltaSink(
txn.readWholeTable()
}

val writeSchema = getWriteSchema(txn.metadata, data.schema)
val writeSchema = getWriteSchema(txn.protocol, txn.metadata, data.schema)
// Streaming sinks can't blindly overwrite schema. See Schema Management design doc for details
updateMetadata(data.sparkSession, txn, writeSchema, partitionColumns, Map.empty,
outputMode == OutputMode.Complete(), rearrangeOnly = false)
Expand Down Expand Up @@ -168,17 +167,20 @@ case class DeltaSink(

/**
* Returns the schema to use to write data to this delta table. The write schema includes new
* columns to add with schema evolution and reconciles types to match the table types.
* columns to add with schema evolution and reconciles types to match the table types when
* possible or apply type widening if enabled.
*/
private def getWriteSchema(metadata: Metadata, dataSchema: StructType): StructType = {
private def getWriteSchema(
protocol: Protocol, metadata: Metadata, dataSchema: StructType): StructType = {
if (!sqlConf.getConf(DeltaSQLConf.DELTA_STREAMING_SINK_ALLOW_IMPLICIT_CASTS)) return dataSchema

if (canOverwriteSchema) return dataSchema

SchemaMergingUtils.mergeSchemas(
tableSchema = metadata.schema,
dataSchema = dataSchema,
allowImplicitConversions = true
metadata.schema,
dataSchema,
allowImplicitConversions = true,
allowTypeWidening = canMergeSchema && TypeWidening.isEnabled(protocol, metadata)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, Trigger}
import org.apache.spark.sql.types._

/**
* Covers handling implicit casting to handle type mismatches when writing data to a Delta sink.
* Defines helper class & methods to test writing to a Delta streaming sink using data types that
* don't match the corresponding column type in the table schema.
*/
abstract class DeltaSinkImplicitCastTest extends DeltaSinkTest {
abstract class DeltaSinkImplicitCastSuiteBase extends DeltaSinkTest {

override def beforeAll(): Unit = {
super.beforeAll()
Expand All @@ -59,6 +60,7 @@ abstract class DeltaSinkImplicitCastTest extends DeltaSinkTest {
extraOptions: Map[String, String])(
data: T*)(
selectExpr: String*): Unit = {
source.addData(data)
val query =
source.toDF()
.selectExpr(selectExpr: _*)
Expand All @@ -67,9 +69,9 @@ abstract class DeltaSinkImplicitCastTest extends DeltaSinkTest {
.outputMode(outputMode)
.options(extraOptions)
.format("delta")
.trigger(Trigger.AvailableNow())
.start(outputDir.getCanonicalPath)
try {
source.addData(data)
failAfter(streamingTimeout) {
query.processAllAvailable()
}
Expand Down Expand Up @@ -106,7 +108,10 @@ abstract class DeltaSinkImplicitCastTest extends DeltaSinkTest {
}
}

class DeltaSinkImplicitCastSuite extends DeltaSinkImplicitCastTest {
/**
* Covers handling implicit casting to handle type mismatches when writing data to a Delta sink.
*/
class DeltaSinkImplicitCastSuite extends DeltaSinkImplicitCastSuiteBase {
import testImplicits._

test(s"write wider type - long -> int") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.typewidening

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.sources.{DeltaSink, DeltaSQLConf}

import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._

/**
* Suite covering automatic type widening in the Delta streaming sink.
*/
class TypeWideningStreamingSinkSuite
extends DeltaSinkImplicitCastSuiteBase
with TypeWideningTestMixin {

import testImplicits._

override def beforeAll(): Unit = {
super.beforeAll()
// Set by default confs to enable automatic type widening in all tests. Negative tests should
// explicitly disable these.
spark.conf.set(DeltaSQLConf.DELTA_STREAMING_SINK_ALLOW_IMPLICIT_CASTS.key, "true")
spark.conf.set(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey, "true")
spark.conf.set(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true")
// Ensure we don't silently cast test inputs to null on overflow.
spark.conf.set(SQLConf.ANSI_ENABLED.key, "true")
}

test("type isn't widened if schema evolution is disabled") {
withDeltaStream[Int] { stream =>
stream.write(17)("CAST(value AS SHORT)")
assert(stream.currentSchema("value").dataType === ShortType)
checkAnswer(stream.read(), Row(17))

withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "false") {
stream.write(53)("CAST(value AS INT)")
assert(stream.currentSchema("value").dataType === ShortType)
checkAnswer(stream.read(), Row(17) :: Row(53) :: Nil)
}
}
}

test("type isn't widened if type widening is disabled") {
withDeltaStream[Int] { stream =>
withSQLConf(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey -> "false") {
stream.write(17)("CAST(value AS SHORT)")
assert(stream.currentSchema("value").dataType === ShortType)
checkAnswer(stream.read(), Row(17))

stream.write(53)("CAST(value AS INT)")
assert(stream.currentSchema("value").dataType === ShortType)
checkAnswer(stream.read(), Row(17) :: Row(53) :: Nil)
}
}
}

test("type is widened if type widening and schema evolution are enabled") {
withDeltaStream[Int] { stream =>
stream.write(17)("CAST(value AS SHORT)")
assert(stream.currentSchema("value").dataType === ShortType)
checkAnswer(stream.read(), Row(17))

stream.write(Int.MaxValue)("CAST(value AS INT)")
assert(stream.currentSchema("value").dataType === IntegerType)
checkAnswer(stream.read(), Row(17) :: Row(Int.MaxValue) :: Nil)
}
}

test("type can be widened even if type casting is disabled in the sink") {
withDeltaStream[Int] { stream =>
stream.write(17)("CAST(value AS SHORT)")
assert(stream.currentSchema("value").dataType === ShortType)
checkAnswer(stream.read(), Row(17))

withSQLConf(DeltaSQLConf.DELTA_STREAMING_SINK_ALLOW_IMPLICIT_CASTS.key -> "false") {
stream.write(Int.MaxValue)("CAST(value AS INT)")
assert(stream.currentSchema("value").dataType === IntegerType)
checkAnswer(stream.read(), Row(17) :: Row(Int.MaxValue) :: Nil)
}
}
}

test("type isn't changed if it's not a wider type") {
withDeltaStream[Int] { stream =>
stream.write(Int.MaxValue)("CAST(value AS INT)")
assert(stream.currentSchema("value").dataType === IntegerType)
checkAnswer(stream.read(), Row(Int.MaxValue))

stream.write(17)("CAST(value AS SHORT)")
assert(stream.currentSchema("value").dataType === IntegerType)
checkAnswer(stream.read(), Row(Int.MaxValue) :: Row(17) :: Nil)
}
}

test("type isn't changed if it's not eligible for automatic widening: int -> decimal") {
withDeltaStream[Int] { stream =>
stream.write(17)("CAST(value AS INT)")
assert(stream.currentSchema("value").dataType === IntegerType)
checkAnswer(stream.read(), Row(17))

stream.write(567)("CAST(value AS DECIMAL(20, 0))")
assert(stream.currentSchema("value").dataType === IntegerType)
checkAnswer(stream.read(), Row(17) :: Row(567) :: Nil)
}
}

test("type isn't changed if it's not eligible for automatic widening: int -> double") {
withDeltaStream[Int] { stream =>
stream.write(17)("CAST(value AS INT)")
assert(stream.currentSchema("value").dataType === IntegerType)
checkAnswer(stream.read(), Row(17))

stream.write(567)("CAST(value AS DOUBLE)")
assert(stream.currentSchema("value").dataType === IntegerType)
checkAnswer(stream.read(), Row(17) :: Row(567) :: Nil)
}
}

test("widen type and add a new column with schema evolution") {
withDeltaStream[(Int, Int)] { stream =>
stream.write((17, -1))("CAST(_1 AS SHORT) AS a")
assert(stream.currentSchema === new StructType().add("a", ShortType))
checkAnswer(stream.read(), Row(17))

stream.write((12, 3456))("CAST(_1 AS INT) AS a", "CAST(_2 AS DECIMAL(10, 2)) AS b")
assert(stream.currentSchema === new StructType()
.add("a", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))
.add("b", DecimalType(10, 2)))
checkAnswer(stream.read(), Row(17, null) :: Row(12, 3456) :: Nil)
}
}

test("widen type during write with missing column") {
withDeltaStream[(Int, Int)] { stream =>
stream.write((17, 45))("CAST(_1 AS SHORT) AS a", "CAST(_2 AS LONG) AS b")
assert(stream.currentSchema === new StructType()
.add("a", ShortType)
.add("b", LongType))
checkAnswer(stream.read(), Row(17, 45))

stream.write((12, -1))("CAST(_1 AS INT) AS a")
assert(stream.currentSchema === new StructType()
.add("a", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))
.add("b", LongType))
checkAnswer(stream.read(), Row(17, 45) :: Row(12, null) :: Nil)
}
}

test("widen type after column rename and drop") {
withDeltaStream[(Int, Int)] { stream =>
stream.write((17, 45))("CAST(_1 AS SHORT) AS a", "CAST(_2 AS DECIMAL(10, 0)) AS b")
assert(stream.currentSchema === new StructType()
.add("a", ShortType)
.add("b", DecimalType(10, 0)))
checkAnswer(stream.read(), Row(17, 45))

sql(
s"""
|ALTER TABLE delta.`${stream.deltaLog.dataPath}` SET TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.minReaderVersion' = '2',
| 'delta.minWriterVersion' = '5'
|)
""".stripMargin)
sql(s"ALTER TABLE delta.`${stream.deltaLog.dataPath}` DROP COLUMN b")
sql(s"ALTER TABLE delta.`${stream.deltaLog.dataPath}` RENAME COLUMN a to c")
assert(stream.currentSchema === new StructType().add("c", ShortType))

stream.write((12, -1))("CAST(_1 AS INT) AS c")
assert(stream.currentSchema === new StructType().add("c", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 4, from = ShortType, to = IntegerType)))
checkAnswer(stream.read(), Row(17) :: Row(12) :: Nil)
}
}

test("type widening in addBatch") {
withTempDir { tempDir =>
val tablePath = tempDir.getAbsolutePath
val deltaLog = DeltaLog.forTable(spark, tablePath)
sqlContext.sparkContext.setLocalProperty(StreamExecution.QUERY_ID_KEY, "streaming_query")
val sink = DeltaSink(
sqlContext,
path = deltaLog.dataPath,
partitionColumns = Seq.empty,
outputMode = OutputMode.Append(),
options = new DeltaOptions(options = Map.empty, conf = spark.sessionState.conf)
)

val schema = new StructType().add("value", ShortType)

{
val data = Seq(0, 1).toDF("value").selectExpr("CAST(value AS SHORT)")
sink.addBatch(0, data)
val df = spark.read.format("delta").load(tablePath)
assert(df.schema === schema)
checkAnswer(df, Row(0) :: Row(1) :: Nil)
}
{
val data = Seq(2, 3).toDF("value").selectExpr("CAST(value AS INT)")
sink.addBatch(1, data)
val df = spark.read.format("delta").load(tablePath)
assert(df.schema === new StructType().add("value", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))
checkAnswer(df, Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil)
}
}
}
}

0 comments on commit d59e3f0

Please # to comment.