From f28c7e96789e40e36004645b7635ecab79550334 Mon Sep 17 00:00:00 2001 From: Zhipeng Mao Date: Thu, 15 Aug 2024 21:44:52 +0200 Subject: [PATCH] [SPARK] Allow non-deterministic expressions in actions of merge (#3558) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description It makes `MergeIntoCommandBase` extend a trait `SupportsNonDeterministicExpression` in Spark that logical plans can extend to check whether it can allow non-deterministic expressions and pass the CheckAnalysis rule. `MergeIntoCommandBase` extends `SupportsNonDeterministicExpression` to check whether all the conditions in the Merge command are deterministic. This is harmless and allows more flexible usage of merge. For example, we use a non-deterministic UDF to generate identity values for identity columns, so it is required to allow non-deterministic expressions in updated/inserted column values of merge statements in order to support merge on target tables with identity columns. So this PR is part of https://github.com/delta-io/delta/issues/1959. ## How was this patch tested? New test cases. ## Does this PR introduce _any_ user-facing changes? Yes. We are changing the behavior to allow non-deterministic expressions in updated/inserted column values of merge statements. We still don't allow non-deterministic expressions in conditions of merge statements. e.g. We currently don't allow the merge statement to add a random noise to the value that is inserted in merge ``` MERGE INTO target USING source ON target.key = source.key WHEN MATCHED THEN UPDATE SET target.value = source.value + rand() ``` Now we are allowing this as this may be helpful in terms of data privacy to not disclose the actual data while preserving the data properties e.g. mean values etc. --- .../delta/commands/MergeIntoCommandBase.scala | 19 +- .../spark/sql/delta/MergeIntoSQLSuite.scala | 167 ++++++++++++++++++ 2 files changed, 185 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala index 81fe6d49863..bb8f4047ea8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala @@ -45,7 +45,8 @@ trait MergeIntoCommandBase extends LeafRunnableCommand with PredicateHelper with ImplicitMetadataOperation with MergeIntoMaterializeSource - with UpdateExpressionsSupport { + with UpdateExpressionsSupport + with SupportsNonDeterministicExpression { @transient val source: LogicalPlan @transient val target: LogicalPlan @@ -470,6 +471,22 @@ trait MergeIntoCommandBase extends LeafRunnableCommand super.prepareMergeSource( spark, source, condition, matchedClauses, notMatchedClauses, isInsertOnly) } + + /** Returns whether it allows non-deterministic expressions. */ + override def allowNonDeterministicExpression: Boolean = { + def isConditionDeterministic(mergeClause: DeltaMergeIntoClause): Boolean = { + mergeClause.condition match { + case Some(c) => c.deterministic + case None => true + } + } + // Allow actions to be non-deterministic while all the conditions + // must be deterministic. + condition.deterministic && + matchedClauses.forall(isConditionDeterministic) && + notMatchedClauses.forall(isConditionDeterministic) && + notMatchedBySourceClauses.forall(isConditionDeterministic) + } } object MergeIntoCommandBase { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala index 2213bfd04d0..fdbc20fe924 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} +import org.scalatest.matchers.must.Matchers.be +import org.scalatest.matchers.should.Matchers.noException import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{Analyzer, ResolveSessionCatalog} @@ -265,6 +267,171 @@ class MergeIntoSQLSuite extends MergeIntoSuiteBase } } + test("detect nondeterministic merge condition") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key AND $nonDeterministicCondition", + update(condition = "s.key < 2", set = "key = s.key, value = s.value"))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("detect nondeterministic update condition in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + update( + condition = nonDeterministicCondition, + set = "key = s.key, value = s.value"))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("detect nondeterministic delete condition in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + delete(condition = nonDeterministicCondition))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("detect nondeterministic insert condition in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + insert( + condition = nonDeterministicCondition, + values = "(key, value) VALUES (s.key, s.value)"))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("detect nondeterministic updateNotMatched condition in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + updateNotMatched( + condition = nonDeterministicCondition, + set = "key = t.key, value = t.value + 1"))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("detect nondeterministic deleteNotMatched condition in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + deleteNotMatched(condition = nonDeterministicCondition))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("allow nondeterministic update action in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicAction = "rand()" + noException should be thrownBy { + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + update( + condition = "s.key < 2", + set = s"key = s.key, value = $nonDeterministicAction")) + } + } + } + + test("allow nondeterministic insert action in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicAction = "rand()" + noException should be thrownBy { + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + insert( + condition = "s.key < 2", + values = s"(key, value) VALUES (s.key, $nonDeterministicAction)")) + } + } + } + + test("allow nondeterministic updateNotMatched action in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicAction = "rand()" + noException should be thrownBy { + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + updateNotMatched( + condition = "t.key < 2", + set = s"key = t.key, value = t.value + $nonDeterministicAction")) + } + } + } + test("merge into a dataset temp views with star") { withTempView("v") { def testMergeWithView(testClue: String): Unit = {