Skip to content

Commit

Permalink
[Spark] Relax check for generated columns and CHECK constraints on ne…
Browse files Browse the repository at this point in the history
…sted struct fields (#3601)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

close #3250.

this PR relaxes the check for nested struct fields especially when only
some are being referenced by CHECK constraints or generated columns,
which allows for more valid use cases in scenarios involving type
widening or schema evolution.

the core function,
`checkReferencedByCheckConstraintsOrGeneratedColumns`, inspects the
nested/inner fields of the provided `StructType` to determine if any are
referenced by dependent (CHECK) constraints or generated columns; for
column types like `ArrayType` or `MapType`, the function checks these
properties directly without inspecting the inner fields.

## How was this patch tested?
through unit tests in `TypeWideningConstraintsSuite` and
`TypeWideningGeneratedColumnsSuite`.

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
yes, now the following (valid) use case will not be rejected by the
check in
[ImplicitMetadataOperation.checkDependentExpression](https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala#L241).

```sql
-- with `DELTA_SCHEMA_AUTO_MIGRATE` enabled
create table t (a struct<x: byte, y: byte>) using delta;
alter table t add constraint ck check (hash(a.x) > 0);

-- changing the type of struct field `a.y` when it's not
-- the field referenced by the CHECK constraint is allowed now.
insert into t (a) values (named_struct('x', CAST(2 AS byte), 'y', 1030))
```

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->

---------

Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
  • Loading branch information
xzhseh and tdas authored Sep 5, 2024
1 parent 7dfc6d9 commit 3a9762f
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.FileSourceGeneratedMetadataStructField
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}

/**
* A trait that writers into Delta can extend to update the schema and/or partitioning of the table.
Expand Down Expand Up @@ -227,6 +227,104 @@ object ImplicitMetadataOperation {
}
}

/**
* Check whether there are dependant (CHECK) constraints for
* the provided `currentDt`; if so, throw an error indicating
* the constraint data type mismatch.
*
* @param spark the spark session used.
* @param path the full column path for the current field.
* @param metadata the metadata used for checking dependant (CHECK) constraints.
* @param currentDt the current data type.
* @param updateDt the updated data type.
*/
private def checkDependentConstraints(
spark: SparkSession,
path: Seq[String],
metadata: Metadata,
currentDt: DataType,
updateDt: DataType): Unit = {
val dependentConstraints =
Constraints.findDependentConstraints(spark, path, metadata)
if (dependentConstraints.nonEmpty) {
throw DeltaErrors.constraintDataTypeMismatch(
path,
currentDt,
updateDt,
dependentConstraints
)
}
}

/**
* Check whether there are dependant generated columns for
* the provided `currentDt`; if so, throw an error indicating
* the generated columns data type mismatch.
*
* @param spark the spark session used.
* @param path the full column path for the current field.
* @param protocol the protocol used.
* @param metadata the metadata used for checking dependant generated columns.
* @param currentDt the current data type.
* @param updateDt the updated data type.
*/
private def checkDependentGeneratedColumns(
spark: SparkSession,
path: Seq[String],
protocol: Protocol,
metadata: Metadata,
currentDt: DataType,
updateDt: DataType): Unit = {
val dependentGeneratedColumns = SchemaUtils.findDependentGeneratedColumns(
spark, path, protocol, metadata.schema)
if (dependentGeneratedColumns.nonEmpty) {
throw DeltaErrors.generatedColumnsDataTypeMismatch(
path,
currentDt,
updateDt,
dependentGeneratedColumns
)
}
}

/**
* Check whether the provided field is currently being referenced
* by CHECK constraints or generated columns.
* Note that we explicitly ignore the check for `StructType` in this
* function by only inspecting its inner fields to relax the check;
* plus, any `StructType` will be traversed in [[checkDependentExpressions]].
*
* @param spark the spark session used.
* @param path the full column path for the current field.
* @param protocol the protocol used.
* @param metadata the metadata used for checking constraints and generated columns.
* @param currentDt the current data type.
* @param updateDt the updated data type.
*/
private def checkConstraintsOrGeneratedColumnsOnStructField(
spark: SparkSession,
path: Seq[String],
protocol: Protocol,
metadata: Metadata,
currentDt: DataType,
updateDt: DataType): Unit = (currentDt, updateDt) match {
// we explicitly ignore the check for `StructType` here.
case (StructType(_), StructType(_)) =>

// FIXME: we intentionally incorporate the pattern match for `ArrayType` and `MapType`
// here mainly due to the field paths for maps/arrays in constraints/generated columns
// are *NOT* consistent with regular field paths,
// e.g., `hash(a.arr[0].x)` vs. `hash(a.element.x)`.
// this makes it hard to recurse into maps/arrays and check for the corresponding
// fields - thus we can not actually block the operation even if the updated field
// is being referenced by any CHECK constraints or generated columns.
case (from, to) =>
if (currentDt != updateDt) {
checkDependentConstraints(spark, path, metadata, from, to)
checkDependentGeneratedColumns(spark, path, protocol, metadata, from, to)
}
}

/**
* Finds all fields that change between the current schema and the new data schema and fail if any
* of them are referenced by check constraints or generated columns.
Expand All @@ -236,42 +334,23 @@ object ImplicitMetadataOperation {
protocol: Protocol,
metadata: actions.Metadata,
dataSchema: StructType): Unit =
SchemaMergingUtils.transformColumns(metadata.schema, dataSchema) {
case (fieldPath, currentField, Some(updateField), _)
// This condition is actually too strict, structs may be identified as changing because one
// of their field is changing even though that field isn't referenced by any constraint or
// generated column. This is intentional to keep the check simple and robust, esp. since it
// aligns with the historical behavior of this check.
if !SchemaMergingUtils.equalsIgnoreCaseAndCompatibleNullability(
currentField.dataType,
updateField.dataType
) =>
val columnPath = fieldPath :+ currentField.name
// check if the field to change is referenced by check constraints
val dependentConstraints =
Constraints.findDependentConstraints(sparkSession, columnPath, metadata)
if (dependentConstraints.nonEmpty) {
throw DeltaErrors.constraintDataTypeMismatch(
columnPath,
currentField.dataType,
updateField.dataType,
dependentConstraints
)
}
// check if the field to change is referenced by any generated columns
val dependentGenCols = SchemaUtils.findDependentGeneratedColumns(
sparkSession, columnPath, protocol, metadata.schema)
if (dependentGenCols.nonEmpty) {
throw DeltaErrors.generatedColumnsDataTypeMismatch(
columnPath,
currentField.dataType,
updateField.dataType,
dependentGenCols
)
}
// We don't transform the schema but just perform checks, the returned field won't be used
// anyway.
updateField
case (_, field, _, _) => field
}
SchemaMergingUtils.transformColumns(metadata.schema, dataSchema) {
case (fieldPath, currentField, Some(updateField), _)
if !SchemaMergingUtils.equalsIgnoreCaseAndCompatibleNullability(
currentField.dataType,
updateField.dataType
) =>
checkConstraintsOrGeneratedColumnsOnStructField(
spark = sparkSession,
path = fieldPath :+ currentField.name,
protocol = protocol,
metadata = metadata,
currentDt = currentField.dataType,
updateDt = updateField.dataType
)
// We don't transform the schema but just perform checks,
// the returned field won't be used anyway.
updateField
case (_, field, _, _) => field
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -790,22 +790,14 @@ trait GeneratedColumnSuiteBase
createTable(table, None, "t STRUCT<a: SMALLINT, b: SMALLINT>, gen SMALLINT",
Map("gen" -> "CAST(HASH(t.a - 10s) AS SMALLINT)"), Nil)

checkError(
exception = intercept[AnalysisException] {
Seq((32767.toShort, 32767)).toDF("a", "b")
.selectExpr("named_struct('a', a, 'b', b) as t")
.write.format("delta").mode("append")
.option("mergeSchema", "true")
.saveAsTable(table)
},
errorClass = "DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH",
parameters = Map(
"columnName" -> "t",
"columnType" -> "STRUCT<a: SMALLINT, b: SMALLINT>",
"dataType" -> "STRUCT<a: SMALLINT, b: INT>",
"generatedColumns" -> "gen -> CAST(HASH(t.a - 10s) AS SMALLINT)"
)
)
// changing the type of `t.b` should succeed since it is not being
// referenced by any CHECK constraints or generated columns.
Seq((32767.toShort, 32767)).toDF("a", "b")
.selectExpr("named_struct('a', a, 'b', b) as t")
.write.format("delta").mode("append")
.option("mergeSchema", "true")
.saveAsTable(table)
checkAnswer(spark.table(table), Row(Row(32767, 32767), -22677) :: Nil)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,58 @@ trait TypeWideningConstraintsTests { self: QueryTest with TypeWideningTestMixin
},
errorClass = "DELTA_CONSTRAINT_DATA_TYPE_MISMATCH",
parameters = Map(
"columnName" -> "a",
"columnType" -> "STRUCT<x: TINYINT, y: TINYINT>",
"dataType" -> "STRUCT<x: INT, y: TINYINT>",
"columnName" -> "a.x",
"columnType" -> "TINYINT",
"dataType" -> "INT",
"constraints" -> "delta.constraints.ck -> hash ( a . x ) > 0"
))
)
)

// changing the type of struct field `a.y` when it's not
// the field referenced by the CHECK constraint is allowed.
sql("INSERT INTO t (a) VALUES (named_struct('x', CAST(2 AS byte), 'y', 500))")
checkAnswer(sql("SELECT hash(a.x) FROM t"), Seq(Row(1765031574), Row(1765031574)))
}
}
}

// We're currently too strict and reject changing the type of struct field a.y even though
// it's not the field referenced by the CHECK constraint.
test("check constraint on nested field with complex type evolution") {
withTable("t") {
sql("CREATE TABLE t (a struct<x: struct<z: byte, h: byte>, y: byte>) USING DELTA")
sql("ALTER TABLE t ADD CONSTRAINT ck CHECK (hash(a.x.z) > 0)")
sql("INSERT INTO t (a) VALUES (named_struct('x', named_struct('z', 2, 'h', 3), 'y', 4))")
checkAnswer(sql("SELECT hash(a.x.z) FROM t"), Row(1765031574))

withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") {
checkError(
exception = intercept[DeltaAnalysisException] {
sql("INSERT INTO t (a) VALUES (named_struct('x', CAST(2 AS byte), 'y', 500))")
sql(
s"""
| INSERT INTO t (a) VALUES (
| named_struct('x', named_struct('z', 200, 'h', 3), 'y', 4)
| )
|""".stripMargin
)
},
errorClass = "DELTA_CONSTRAINT_DATA_TYPE_MISMATCH",
parameters = Map(
"columnName" -> "a",
"columnType" -> "STRUCT<x: TINYINT, y: TINYINT>",
"dataType" -> "STRUCT<x: TINYINT, y: INT>",
"constraints" -> "delta.constraints.ck -> hash ( a . x ) > 0"
))
"columnName" -> "a.x.z",
"columnType" -> "TINYINT",
"dataType" -> "INT",
"constraints" -> "delta.constraints.ck -> hash ( a . x . z ) > 0"
)
)

// changing the type of struct field `a.y` and `a.x.h` when it's not
// the field referenced by the CHECK constraint is allowed.
sql(
"""
| INSERT INTO t (a) VALUES (
| named_struct('x', named_struct('z', CAST(2 AS BYTE), 'h', 2002), 'y', 1030)
| )
|""".stripMargin
)
checkAnswer(sql("SELECT hash(a.x.z) FROM t"), Seq(Row(1765031574), Row(1765031574)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ trait TypeWideningGeneratedColumnTests extends GeneratedColumnTest {
partitionColumns = Seq.empty
)
sql("INSERT INTO t (a) VALUES (named_struct('x', 2, 'y', 3))")
checkAnswer(sql("SELECT hash(a.x) FROM t"), Row(1765031574))
checkAnswer(sql("SELECT gen FROM t"), Row(1765031574))

withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") {
checkError(
Expand All @@ -139,25 +139,64 @@ trait TypeWideningGeneratedColumnTests extends GeneratedColumnTest {
},
errorClass = "DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH",
parameters = Map(
"columnName" -> "a",
"columnType" -> "STRUCT<x: TINYINT, y: TINYINT>",
"dataType" -> "STRUCT<x: INT, y: TINYINT>",
"columnName" -> "a.x",
"columnType" -> "TINYINT",
"dataType" -> "INT",
"generatedColumns" -> "gen -> hash(a.x)"
))
)
)

// We're currently too strict and reject changing the type of struct field a.y even though
// it's not the field referenced by the generated column.
// changing the type of struct field `a.y` when it's not
// the field referenced by the generated column is allowed.
sql("INSERT INTO t (a) VALUES (named_struct('x', CAST(2 AS byte), 'y', 200))")
checkAnswer(sql("SELECT gen FROM t"), Seq(Row(1765031574), Row(1765031574)))
}
}
}

test("generated column on nested field with complex type evolution") {
withTable("t") {
createTable(
tableName = "t",
path = None,
schemaString = "a struct<x: struct<z: byte, h: byte>, y: byte>, gen int",
generatedColumns = Map("gen" -> "hash(a.x.z)"),
partitionColumns = Seq.empty
)

sql("INSERT INTO t (a) VALUES (named_struct('x', named_struct('z', 2, 'h', 3), 'y', 4))")
checkAnswer(sql("SELECT gen FROM t"), Row(1765031574))

withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") {
checkError(
exception = intercept[DeltaAnalysisException] {
sql("INSERT INTO t (a) VALUES (named_struct('x', CAST(2 AS byte), 'y', 200))")
sql(
s"""
| INSERT INTO t (a) VALUES (
| named_struct('x', named_struct('z', 200, 'h', 3), 'y', 4)
| )
|""".stripMargin
)
},
errorClass = "DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH",
parameters = Map(
"columnName" -> "a",
"columnType" -> "STRUCT<x: TINYINT, y: TINYINT>",
"dataType" -> "STRUCT<x: TINYINT, y: INT>",
"generatedColumns" -> "gen -> hash(a.x)"
))
"columnName" -> "a.x.z",
"columnType" -> "TINYINT",
"dataType" -> "INT",
"generatedColumns" -> "gen -> hash(a.x.z)"
)
)

// changing the type of struct field `a.y` when it's not
// the field referenced by the generated column is allowed.
sql(
"""
| INSERT INTO t (a) VALUES (
| named_struct('x', named_struct('z', CAST(2 AS BYTE), 'h', 2002), 'y', 1030)
| )
|""".stripMargin
)
checkAnswer(sql("SELECT gen FROM t"), Seq(Row(1765031574), Row(1765031574)))
}
}
}
Expand Down

0 comments on commit 3a9762f

Please # to comment.