Skip to content

Commit

Permalink
Fix PreprocessTableMerge to include new columns from WHEN MATCHED cla…
Browse files Browse the repository at this point in the history
…uses

Delta's `MERGE INTO` fails when there are multiple `UPDATE` clauses and one is `UPDATE SET *` with schema evolution.
Specifically, if `UPDATE SET *` is used to merge a source with a superset of target columns and an additional `UPDATE SET` clause is present (which must operate on a target column), then the merge will fail due to inability to resolve some source-only columns. The example below fails:
```sql
SET spark.databricks.delta.schema.autoMerge.enabled=true;

-- tgt1 has columns: [a]
-- s    has columns: [a, b]
WITH s(a, b) AS (SELECT * FROM VALUES (1, 's_b'))
MERGE INTO zvs.tgt1 t USING s ON t.a = s.a
  WHEN MATCHED AND t.a < 1 THEN UPDATE SET t.a = 0
  WHEN MATCHED THEN UPDATE SET *

-- output:
-- Error in SQL statement: AnalysisException: Resolved attribute(s) b#247210 missing from ...
```
This case seems to have been missed when implementing `processMatched` in `PreprocessTableMerge`. Specifically, that other `WHEN MATCHED` clauses can introduce new columns that must be filled in with ‘null’. Currently, only `WHEN NOT MATCHED` are considered. Best just shown with code flow in the example above:

- `processMatched` is map over (clause1 [SET t.a=0], clause2 [SET *])
- resolvedActions:
  - clause1 resolvedActions are `[a=0]`
  - clause2 resolvedActions has `[a=a, b=b]` => causes schema evolution: target now has schema `[a, b]`.

now we will only consider clause1: this causes the failure. clause2 is only important in that it triggers schema evolution so that finalSchema is `[a, b]`. Since there are no `WHEN NOT MATCHED` clauses, there are no `newColumns`. Then, the only `UpdateOperation` used in `generateUpdateExpressions` is: `[a=0]`. This means that `generateUpdateExpressions` is called with `targetCols` `[a, b]` and only `updateOp` `[a=0]`. column `b` (not present in the target) is passed through and an unresolvable attribute ends up in our final plan.

The fix is to simply consider new columns from other `WHEN MATCHED` clauses as well as `WHEN NOT MATCHED`.

New unit test validating correct behavior with multiple UPDATE clauses and one is `UPDATE SET *`.

GitOrigin-RevId: f2a849c1fc5589a26512e0a7f1cc5adc8e8eb7f1
  • Loading branch information
zachschuermann authored and zsxwing committed Aug 22, 2022
1 parent 7b36691 commit 943e153
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,13 @@ case class PreprocessTableMerge(override val conf: SQLConf)

val processedMatched = matched.map {
case m: DeltaMergeIntoUpdateClause =>
// Get any new columns which are in the insert clause, but not the target output or this
// update clause.
// Get any new columns which are in the update/insert clauses, but not the target output
val existingColumns = m.resolvedActions.map(_.targetColNameParts.head) ++
target.output.map(_.name)
val newColumns = notMatched.toSeq.flatMap {
_.resolvedActions.filterNot { insertAct =>
val newColumns = (matched ++ notMatched).toSeq.flatMap {
_.resolvedActions.filterNot { action =>
existingColumns.exists { colName =>
conf.resolver(insertAct.targetColNameParts.head, colName)
conf.resolver(action.targetColNameParts.head, colName)
}
}
}
Expand All @@ -118,7 +117,7 @@ case class PreprocessTableMerge(override val conf: SQLConf)
builder.result()
}

val newColsFromInsert = distinctBy(newColumns)(_.targetColNameParts).map { action =>
val newColumnsDistinct = distinctBy(newColumns)(_.targetColNameParts).map { action =>
AttributeReference(action.targetColNameParts.head, action.dataType)()
}

Expand All @@ -127,8 +126,8 @@ case class PreprocessTableMerge(override val conf: SQLConf)
UpdateOperation(a.targetColNameParts, a.expr)
}

// And construct operations for columns that the insert clause will add.
val newOpsFromInsert = newColsFromInsert.map { col =>
// And construct operations for columns that the insert/update clauses will add.
val newUpdateOps = newColumnsDistinct.map { col =>
UpdateOperation(Seq(col.name), Literal(null, col.dataType))
}

Expand All @@ -147,7 +146,7 @@ case class PreprocessTableMerge(override val conf: SQLConf)
// that nested fields can be updated (only for existing columns).
val alignedExprs = generateUpdateExpressions(
finalSchemaExprs,
existingUpdateOps ++ newOpsFromInsert,
existingUpdateOps ++ newUpdateOps,
conf.resolver,
allowStructEvolution = migrateSchema,
generatedColumns = generatedColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2569,6 +2569,19 @@ abstract class MergeIntoSuiteBase
expectedWithoutEvolution = ((0, 0) +: (3, 30) +: (1, 1) +: Nil).toDF("key", "value")
)

testEvolution("new column with update set and update *")(
targetData = Seq((0, 0), (1, 10), (2, 20)).toDF("key", "value"),
sourceData = Seq((1, 1, "extra1"), (2, 2, "extra2")).toDF("key", "value", "extra"),
clauses = update(condition = "s.key < 2", set = "value = s.value") :: update("*") :: Nil,
expected =
((0, 0, null) +:
(1, 1, null) +: // updated by first clause
(2, 2, "extra2") +: // updated by second clause
Nil
).toDF("key", "value", "extra"),
expectedWithoutEvolution = ((0, 0) +: (1, 1) +: (2, 2) +: Nil).toDF("key", "value")
)

testEvolution("update * with column not in source")(
targetData = Seq((0, 0, 0), (1, 10, 10), (3, 30, 30)).toDF("key", "value", "extra"),
sourceData = Seq((1, 1), (2, 2)).toDF("key", "value"),
Expand Down

0 comments on commit 943e153

Please # to comment.