diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala index 81fe6d49863..bb8f4047ea8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala @@ -45,7 +45,8 @@ trait MergeIntoCommandBase extends LeafRunnableCommand with PredicateHelper with ImplicitMetadataOperation with MergeIntoMaterializeSource - with UpdateExpressionsSupport { + with UpdateExpressionsSupport + with SupportsNonDeterministicExpression { @transient val source: LogicalPlan @transient val target: LogicalPlan @@ -470,6 +471,22 @@ trait MergeIntoCommandBase extends LeafRunnableCommand super.prepareMergeSource( spark, source, condition, matchedClauses, notMatchedClauses, isInsertOnly) } + + /** Returns whether it allows non-deterministic expressions. */ + override def allowNonDeterministicExpression: Boolean = { + def isConditionDeterministic(mergeClause: DeltaMergeIntoClause): Boolean = { + mergeClause.condition match { + case Some(c) => c.deterministic + case None => true + } + } + // Allow actions to be non-deterministic while all the conditions + // must be deterministic. + condition.deterministic && + matchedClauses.forall(isConditionDeterministic) && + notMatchedClauses.forall(isConditionDeterministic) && + notMatchedBySourceClauses.forall(isConditionDeterministic) + } } object MergeIntoCommandBase { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala index 2213bfd04d0..fdbc20fe924 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} +import org.scalatest.matchers.must.Matchers.be +import org.scalatest.matchers.should.Matchers.noException import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{Analyzer, ResolveSessionCatalog} @@ -265,6 +267,171 @@ class MergeIntoSQLSuite extends MergeIntoSuiteBase } } + test("detect nondeterministic merge condition") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key AND $nonDeterministicCondition", + update(condition = "s.key < 2", set = "key = s.key, value = s.value"))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("detect nondeterministic update condition in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + update( + condition = nonDeterministicCondition, + set = "key = s.key, value = s.value"))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("detect nondeterministic delete condition in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + delete(condition = nonDeterministicCondition))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("detect nondeterministic insert condition in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + insert( + condition = nonDeterministicCondition, + values = "(key, value) VALUES (s.key, s.value)"))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("detect nondeterministic updateNotMatched condition in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + updateNotMatched( + condition = nonDeterministicCondition, + set = "key = t.key, value = t.value + 1"))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("detect nondeterministic deleteNotMatched condition in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicCondition = "rand() > 0.5" + val e = intercept[AnalysisException]( + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + deleteNotMatched(condition = nonDeterministicCondition))) + assert(e.getMessage.contains("DELTA_NON_DETERMINISTIC_FUNCTION_NOT_SUPPORTED")) + } + } + + test("allow nondeterministic update action in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicAction = "rand()" + noException should be thrownBy { + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + update( + condition = "s.key < 2", + set = s"key = s.key, value = $nonDeterministicAction")) + } + } + } + + test("allow nondeterministic insert action in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicAction = "rand()" + noException should be thrownBy { + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + insert( + condition = "s.key < 2", + values = s"(key, value) VALUES (s.key, $nonDeterministicAction)")) + } + } + } + + test("allow nondeterministic updateNotMatched action in merge") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: (2, 20) :: Nil, + target = (-1, -10) :: (1, 1) :: (2, 2) :: Nil + ) { + case (sourceName, targetName) => + val nonDeterministicAction = "rand()" + noException should be thrownBy { + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = s"s.key = t.key", + updateNotMatched( + condition = "t.key < 2", + set = s"key = t.key, value = t.value + $nonDeterministicAction")) + } + } + } + test("merge into a dataset temp views with star") { withTempView("v") { def testMergeWithView(testClue: String): Unit = {