Skip to content

Commit

Permalink
[Spark] Execute MERGE using Dataframe API in Scala (#3456)
Browse files Browse the repository at this point in the history
## Description
Due to Spark unfortunate behavior of resolving plan nodes it doesn't
know, the `DeltaMergeInto` plan created when using the MERGE scala API
needs to be manually resolved to ensure spark doesn't interfere with its
analysis.

This currently completely bypasses Spark's analysis as we then manually
execute the MERGE command which has negatiev effects, e.g. the execution
is not visible in QueryExecutionListener.

This change addresses this issue, by executing the plan using the
Dataframe API after it's manually resolved so that the command goes
through the regular code path.

Resolves #1521
## How was this patch tested?
Covered by existing tests.
  • Loading branch information
johanl-db authored Aug 1, 2024
1 parent eb719f8 commit 9ca8e82
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class DeltaMergeBuilder private(
// Resolve UpCast expressions that `PreprocessTableMerge` may have introduced.
mergeIntoCommand = PostHocResolveUpCast(sparkSession).apply(mergeIntoCommand)
sparkSession.sessionState.analyzer.checkAnalysis(mergeIntoCommand)
mergeIntoCommand.asInstanceOf[MergeIntoCommand].run(sparkSession)
toDataset(sparkSession, mergeIntoCommand)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.Locale
import scala.language.implicitConversions

import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord}
import org.apache.spark.sql.delta.commands.MergeIntoCommand
import org.apache.spark.sql.delta.commands.merge.MergeStats
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
Expand Down Expand Up @@ -3362,6 +3363,25 @@ abstract class MergeIntoSuiteBase
"delta.dml.merge.findTouchedFiles",
"delta.dml.merge.writeInsertsOnlyWhenNoMatches",
"delta.dml.merge")

test("merge execution is recorded with QueryExecutionListener") {
withKeyValueData(
source = (0, 0) :: (1, 10) :: Nil,
target = (1, 1) :: (2, 2) :: Nil) { case (sourceName, targetName) =>
val plans = withLogicalPlansCaptured(spark, optimizedPlan = false) {
executeMerge(
tgt = s"$targetName t",
src = s"$sourceName s",
cond = "s.key = t.key",
update(set = "*"))
}
val mergeCommands = plans.collect {
case m: MergeIntoCommand => m
}
assert(mergeCommands.size === 1,
"Merge command wasn't properly recorded by QueryExecutionListener")
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,15 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSparkSession

trait SchemaValidationSuiteBase extends QueryTest with SharedSparkSession with DeltaSQLCommandTest {

def checkMergeException(e: Exception, col: String): Unit = {
assert(e.isInstanceOf[MetadataChangedException])
assert(e.getMessage.contains(
"The metadata of the Delta table has been changed by a concurrent update"))
}
}

/**
* This Suite tests the behavior of Delta commands when a schema altering commit is run after the
* command completes analysis but before the command starts the transaction. We want to make sure
* That we do not corrupt tables.
*/
class SchemaValidationSuite extends SchemaValidationSuiteBase {
class SchemaValidationSuite
extends QueryTest
with SharedSparkSession
with DeltaSQLCommandTest {

class BlockingRule(
blockActionLatch: CountDownLatch,
Expand Down Expand Up @@ -331,7 +325,7 @@ class SchemaValidationSuite extends SchemaValidationSuiteBase {

/**
* Concurrently drop column in merge condition. Merge command detects the schema change while
* resolving the target and throws an AnalysisException
* resolving the target and throws a DeltaAnalysisException
*/
testConcurrentChange("merge - remove a column in merge condition concurrently")(
createTable = (spark: SparkSession, tblPath: String) => {
Expand All @@ -343,7 +337,7 @@ class SchemaValidationSuite extends SchemaValidationSuiteBase {
actionToTest = (spark: SparkSession, tblPath: String) => {
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tblPath)
val sourceDf = spark.range(10).withColumn("col2", lit(2))
val e = intercept[Exception] {
val e = intercept[DeltaAnalysisException] {
deltaTable.as("t1")
.merge(sourceDf.as("t2"), "t1.id == t2.id")
.whenNotMatched()
Expand All @@ -352,14 +346,22 @@ class SchemaValidationSuite extends SchemaValidationSuiteBase {
.updateAll()
.execute()
}
checkMergeException(e, "id")

checkErrorMatchPVals(
exception = e,
errorClass = "DELTA_SCHEMA_CHANGE_SINCE_ANALYSIS",
parameters = Map(
"schemaDiff" -> ".*id.*",
"legacyFlagMessage" -> ""
)
)
},
concurrentChange = dropColFromSampleTable("id")
)

/**
* Concurrently drop column not in merge condition but in target. Merge command detects the schema
* change while resolving the target and throws an AnalysisException
* change while resolving the target and throws a DeltaAnalysisException
*/
testConcurrentChange("merge - remove a column not in merge condition concurrently")(
createTable = (spark: SparkSession, tblPath: String) => {
Expand All @@ -371,7 +373,7 @@ class SchemaValidationSuite extends SchemaValidationSuiteBase {
actionToTest = (spark: SparkSession, tblPath: String) => {
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tblPath)
val sourceDf = spark.range(10).withColumn("col2", lit(2))
val e = intercept[Exception] {
val e = intercept[DeltaAnalysisException] {
deltaTable.as("t1")
.merge(sourceDf.as("t2"), "t1.id == t2.id")
.whenNotMatched()
Expand All @@ -380,7 +382,14 @@ class SchemaValidationSuite extends SchemaValidationSuiteBase {
.updateAll()
.execute()
}
checkMergeException(e, "col2")
checkErrorMatchPVals(
exception = e,
errorClass = "DELTA_SCHEMA_CHANGE_SINCE_ANALYSIS",
parameters = Map(
"schemaDiff" -> ".*col2.*",
"legacyFlagMessage" -> ""
)
)
},
concurrentChange = dropColFromSampleTable("col2")
)
Expand Down

0 comments on commit 9ca8e82

Please # to comment.