Skip to content

Commit

Permalink
Materialize non-deterministic source in MERGE
Browse files Browse the repository at this point in the history
- This PR fixes #527
- MERGE consists of two passes. During the first pass, it scans over the target table to find all files that are affected by the MERGE operation. During the second pass it reads those files again to update/insert the rows from the source table.
- If the source changes between the two passes and contains an additional row that is in the target table, but not in one of the files that have been identified in pass 1, it will insert this row into the target table instead of updating the original row, leading to duplicate rows.
- This can happen if the source is non-deterministic. A source is classified as non-deterministic if any of the operators in the source plan is non-deterministic (i.e. depends on some mutable internal state or some other input that is not part of the outputs of the children), or if it is a non-delta scan.
- We solve this issue by materializing the source table at the start of a MERGE operation if it is non-deterministic, removing the possibility that the table changes during the two passes. The logic of source materialization is encapsulated in ```MergeIntoMaterializeSource``` and is used by ```MergeIntoCommand```.
- The source is materialized onto the local disks of the executors using RDD local checkpoint. In case RDD blocks are lost, a retry loop is introduced. Blocks can be lost e.g. because of Spot instance kills. In case of using autoscaling through Spark dynamic allocation, executor decomissioning can be enabled with the following configs to gracefully migrate the blocks.
```
spark.decommission.enabled=true
spark.storage.decommission.rddBlocks.enabled=true
```
- When materializing the source table we lose the statistics and inferred constraints about the table, which can lead to regressions. We include a manual broadcast hint in the source table if the table size is small, ensuring that we choose the most efficient join when possible, and a "dummy" filter to re-introduce the constraints that can be used for further filter inference. apache/spark#37248 has implemented to make it work out-of-the-box in Spark 3.4, so these workarounds can be removed then.

Closes #1418

GitOrigin-RevId: f8cd57e28b52c58ed7ba0b44ae868d5ea5bd534c
  • Loading branch information
fred-db authored and allisonport-db committed Nov 28, 2022
1 parent 6a2725a commit b07257d
Show file tree
Hide file tree
Showing 6 changed files with 945 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction}
import org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -121,7 +122,12 @@ case class MergeStats(
targetRowsCopied: Long,
targetRowsUpdated: Long,
targetRowsInserted: Long,
targetRowsDeleted: Long
targetRowsDeleted: Long,

// MergeMaterializeSource stats
materializeSourceReason: Option[String] = None,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
materializeSourceAttempts: Option[Long] = None
)

object MergeStats {
Expand Down Expand Up @@ -220,7 +226,11 @@ case class MergeIntoCommand(
matchedClauses: Seq[DeltaMergeIntoMatchedClause],
notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause],
migratedSchema: Option[StructType]) extends LeafRunnableCommand
with DeltaCommand with PredicateHelper with AnalysisHelper with ImplicitMetadataOperation {
with DeltaCommand
with PredicateHelper
with AnalysisHelper
with ImplicitMetadataOperation
with MergeIntoMaterializeSource {

import MergeIntoCommand._

Expand Down Expand Up @@ -313,7 +323,18 @@ case class MergeIntoCommand(
)
}
}
val (materializeSource, _) = shouldMaterializeSource(spark, source, isSingleInsertOnly)
if (!materializeSource) {
runMerge(spark)
} else {
// If it is determined that source should be materialized, wrap the execution with retries,
// in case the data of the materialized source is lost.
runWithMaterializedSourceLostRetries(
spark, targetFileIndex.deltaLog, metrics, runMerge)
}
}

protected def runMerge(spark: SparkSession): Seq[Row] = {
recordDeltaOperation(targetDeltaLog, "delta.dml.merge") {
val startTime = System.nanoTime()
targetDeltaLog.withNewTransaction { deltaTxn =>
Expand All @@ -329,6 +350,16 @@ case class MergeIntoCommand(
isOverwriteMode = false, rearrangeOnly = false)
}

// If materialized, prepare the DF reading the materialize source
// Otherwise, prepare a regular DF from source plan.
val materializeSourceReason = prepareSourceDFAndReturnMaterializeReason(
spark,
source,
condition,
matchedClauses,
notMatchedClauses,
isSingleInsertOnly)

val deltaActions = {
if (isSingleInsertOnly && spark.conf.get(DeltaSQLConf.MERGE_INSERT_ONLY_ENABLED)) {
writeInsertsOnlyWhenNoMatchedClauses(spark, deltaTxn)
Expand Down Expand Up @@ -363,9 +394,13 @@ case class MergeIntoCommand(
notMatchedClauses.map(DeltaOperations.MergePredicate(_))))

// Record metrics
val stats = MergeStats.fromMergeSQLMetrics(
var stats = MergeStats.fromMergeSQLMetrics(
metrics, condition, matchedClauses, notMatchedClauses,
deltaTxn.metadata.partitionColumns.nonEmpty)
stats = stats.copy(
materializeSourceReason = Some(materializeSourceReason.toString),
materializeSourceAttempts = Some(attempt))

recordDeltaEvent(targetFileIndex.deltaLog, "delta.dml.merge.stats", data = stats)

}
Expand Down Expand Up @@ -407,7 +442,7 @@ case class MergeIntoCommand(

// UDF to increment metrics
val incrSourceRowCountExpr = makeMetricUpdateUDF("numSourceRows")
val sourceDF = Dataset.ofRows(spark, source)
val sourceDF = getSourceDF()
.filter(new Column(incrSourceRowCountExpr))

// Apply inner join to between source and target using the merge condition to find matches
Expand Down Expand Up @@ -520,7 +555,7 @@ case class MergeIntoCommand(
}

// source DataFrame
val sourceDF = Dataset.ofRows(spark, source)
val sourceDF = getSourceDF()
.filter(new Column(incrSourceRowCountExpr))
.filter(new Column(notMatchedClauses.head.condition.getOrElse(Literal.TrueLiteral)))

Expand Down Expand Up @@ -641,7 +676,7 @@ case class MergeIntoCommand(
// We add row IDs to the targetDF if we have a delete-when-matched clause with duplicate
// matches and CDC is enabled, and additionally add row IDs to the source if we also have an
// insert clause. See above at isDeleteWithDuplicateMatchesAndCdc definition for more details.
var sourceDF = Dataset.ofRows(spark, source)
var sourceDF = getSourceDF()
.withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr))
var targetDF = Dataset.ofRows(spark, newTarget)
.withColumn(TARGET_ROW_PRESENT_COL, lit(true))
Expand Down
Loading

0 comments on commit b07257d

Please # to comment.