Skip to content

Commit

Permalink
[SPARK] Allow non-deterministic expressions in actions of merge (#3558)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

It makes `MergeIntoCommandBase` extend a trait
`SupportsNonDeterministicExpression` in Spark that logical plans can
extend to check whether it can allow non-deterministic expressions and
pass the CheckAnalysis rule.

`MergeIntoCommandBase` extends `SupportsNonDeterministicExpression` to
check whether all the conditions in the Merge command are deterministic.

This is harmless and allows more flexible usage of merge. For example,
we use a non-deterministic UDF to generate identity values for identity
columns, so it is required to allow non-deterministic expressions in
updated/inserted column values of merge statements in order to support
merge on target tables with identity columns. So this PR is part of
#1959.



## How was this patch tested?
New test cases.
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->

Yes.
We are changing the behavior to allow non-deterministic expressions in
updated/inserted column values of merge statements. We still don't allow
non-deterministic expressions in conditions of merge statements.

e.g. 
We currently don't allow the merge statement to add a random noise to
the value that is inserted in merge

```
MERGE INTO target USING source
ON target.key = source.key
WHEN MATCHED THEN UPDATE SET target.value = source.value + rand()
```

Now we are allowing this as this may be helpful in terms of data privacy
to not disclose the actual data while preserving the data properties
e.g. mean values etc.
  • Loading branch information
zhipengmao-db authored Aug 15, 2024
1 parent 78479de commit f28c7e9
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ trait MergeIntoCommandBase extends LeafRunnableCommand
with PredicateHelper
with ImplicitMetadataOperation
with MergeIntoMaterializeSource
with UpdateExpressionsSupport {
with UpdateExpressionsSupport
with SupportsNonDeterministicExpression {

@transient val source: LogicalPlan
@transient val target: LogicalPlan
Expand Down Expand Up @@ -470,6 +471,22 @@ trait MergeIntoCommandBase extends LeafRunnableCommand
super.prepareMergeSource(
spark, source, condition, matchedClauses, notMatchedClauses, isInsertOnly)
}

/** Returns whether it allows non-deterministic expressions. */
override def allowNonDeterministicExpression: Boolean = {
def isConditionDeterministic(mergeClause: DeltaMergeIntoClause): Boolean = {
mergeClause.condition match {
case Some(c) => c.deterministic
case None => true
}
}
// Allow actions to be non-deterministic while all the conditions
// must be deterministic.
condition.deterministic &&
matchedClauses.forall(isConditionDeterministic) &&
notMatchedClauses.forall(isConditionDeterministic) &&
notMatchedBySourceClauses.forall(isConditionDeterministic)
}
}

object MergeIntoCommandBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.delta
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
import org.scalatest.matchers.must.Matchers.be
import org.scalatest.matchers.should.Matchers.noException

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, ResolveSessionCatalog}
Expand Down Expand Up @@ -265,6 +267,171 @@ class MergeIntoSQLSuite extends MergeIntoSuiteBase
}
}

test("detect nondeterministic merge condition") {
withKeyValueData(
source = (0, 0) :: (1, 10) :: (2, 20) :: Nil,
target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil
) {
case (sourceName, targetName) =>
val nonDeterministicCondition = "rand() > 0.5"
val e = intercept[AnalysisException](
executeMerge(
tgt = s"$targetName t",
src = s"$sourceName s",
cond = s"s.key = t.key AND $nonDeterministicCondition",
update(condition = "s.key < 2", set = "key = s.key, value = s.value")))
assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED"))
}
}

test("detect nondeterministic update condition in merge") {
withKeyValueData(
source = (0, 0) :: (1, 10) :: (2, 20) :: Nil,
target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil
) {
case (sourceName, targetName) =>
val nonDeterministicCondition = "rand() > 0.5"
val e = intercept[AnalysisException](
executeMerge(
tgt = s"$targetName t",
src = s"$sourceName s",
cond = s"s.key = t.key",
update(
condition = nonDeterministicCondition,
set = "key = s.key, value = s.value")))
assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED"))
}
}

test("detect nondeterministic delete condition in merge") {
withKeyValueData(
source = (0, 0) :: (1, 10) :: (2, 20) :: Nil,
target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil
) {
case (sourceName, targetName) =>
val nonDeterministicCondition = "rand() > 0.5"
val e = intercept[AnalysisException](
executeMerge(
tgt = s"$targetName t",
src = s"$sourceName s",
cond = s"s.key = t.key",
delete(condition = nonDeterministicCondition)))
assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED"))
}
}

test("detect nondeterministic insert condition in merge") {
withKeyValueData(
source = (0, 0) :: (1, 10) :: (2, 20) :: Nil,
target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil
) {
case (sourceName, targetName) =>
val nonDeterministicCondition = "rand() > 0.5"
val e = intercept[AnalysisException](
executeMerge(
tgt = s"$targetName t",
src = s"$sourceName s",
cond = s"s.key = t.key",
insert(
condition = nonDeterministicCondition,
values = "(key, value) VALUES (s.key, s.value)")))
assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED"))
}
}

test("detect nondeterministic updateNotMatched condition in merge") {
withKeyValueData(
source = (0, 0) :: (1, 10) :: (2, 20) :: Nil,
target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil
) {
case (sourceName, targetName) =>
val nonDeterministicCondition = "rand() > 0.5"
val e = intercept[AnalysisException](
executeMerge(
tgt = s"$targetName t",
src = s"$sourceName s",
cond = s"s.key = t.key",
updateNotMatched(
condition = nonDeterministicCondition,
set = "key = t.key, value = t.value + 1")))
assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED"))
}
}

test("detect nondeterministic deleteNotMatched condition in merge") {
withKeyValueData(
source = (0, 0) :: (1, 10) :: (2, 20) :: Nil,
target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil
) {
case (sourceName, targetName) =>
val nonDeterministicCondition = "rand() > 0.5"
val e = intercept[AnalysisException](
executeMerge(
tgt = s"$targetName t",
src = s"$sourceName s",
cond = s"s.key = t.key",
deleteNotMatched(condition = nonDeterministicCondition)))
assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED"))
}
}

test("allow nondeterministic update action in merge") {
withKeyValueData(
source = (0, 0) :: (1, 10) :: (2, 20) :: Nil,
target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil
) {
case (sourceName, targetName) =>
val nonDeterministicAction = "rand()"
noException should be thrownBy {
executeMerge(
tgt = s"$targetName t",
src = s"$sourceName s",
cond = s"s.key = t.key",
update(
condition = "s.key < 2",
set = s"key = s.key, value = $nonDeterministicAction"))
}
}
}

test("allow nondeterministic insert action in merge") {
withKeyValueData(
source = (0, 0) :: (1, 10) :: (2, 20) :: Nil,
target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil
) {
case (sourceName, targetName) =>
val nonDeterministicAction = "rand()"
noException should be thrownBy {
executeMerge(
tgt = s"$targetName t",
src = s"$sourceName s",
cond = s"s.key = t.key",
insert(
condition = "s.key < 2",
values = s"(key, value) VALUES (s.key, $nonDeterministicAction)"))
}
}
}

test("allow nondeterministic updateNotMatched action in merge") {
withKeyValueData(
source = (0, 0) :: (1, 10) :: (2, 20) :: Nil,
target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil
) {
case (sourceName, targetName) =>
val nonDeterministicAction = "rand()"
noException should be thrownBy {
executeMerge(
tgt = s"$targetName t",
src = s"$sourceName s",
cond = s"s.key = t.key",
updateNotMatched(
condition = "t.key < 2",
set = s"key = t.key, value = t.value + $nonDeterministicAction"))
}
}
}

test("merge into a dataset temp views with star") {
withTempView("v") {
def testMergeWithView(testClue: String): Unit = {
Expand Down

0 comments on commit f28c7e9

Please # to comment.