Skip to content

Commit

Permalink
[Spark][UPDATE with DV] Let UPDATE command write DVs
Browse files Browse the repository at this point in the history
This is the first PR in [[Feature Request] Support UPDATE command with Deletion Vectors](#1923).

This PR introduces a `UPDATE_USE_PERSISTENT_DELETION_VECTORS` config to enable/disable writing DVs for the UPDATE command. In short, rows being updated will be marked as `deleted` by DV, while updated rows will be written to a new file. When CDF is enabled, updated rows and CDC (`preimage` and `postimage`) will be written to the file.

New, preliminary tests.

Yes. When `UPDATE_USE_PERSISTENT_DELETION_VECTORS` is set to true, `UPDATE` command will not rewrite the whole file but write only the rows being updated.

Closes #1942

Signed-off-by: Paddy Xu <xupaddy@gmail.com>
GitOrigin-RevId: 3ad7c251bb064420d17cd1e685265e61845096a7
  • Loading branch information
xupefei authored and vkorukanti committed Oct 6, 2023
1 parent bbf19c3 commit 0a0ea97
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, FileSourceMetadataAttribute, GenericInternalRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation}
Expand All @@ -47,11 +47,13 @@ import org.apache.spark.util.{SerializableConfiguration, Utils => SparkUtils}


/**
* Contains utility classes and method to delete rows in a table using the Deletion Vectors.
* Contains utility classes and method for performing DML operations with Deletion Vectors.
*/
object DeleteWithDeletionVectorsHelper extends DeltaCommand {
object DMLWithDeletionVectorsHelper extends DeltaCommand {
val SUPPORTED_DML_COMMANDS: Seq[String] = Seq("DELETE", "UPDATE")

/**
* Creates a DataFrame that can be used to scan for rows matching DELETE condition in given
* Creates a DataFrame that can be used to scan for rows matching the condition in the given
* files. Generally the given file list is a pruned file list using the stats based pruning.
*/
def createTargetDfForScanningForMatches(
Expand Down Expand Up @@ -114,8 +116,14 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
deltaLog: DeltaLog,
targetDf: DataFrame,
fileIndex: TahoeFileIndex,
condition: Expression): Seq[TouchedFileWithDV] = {
recordDeltaOperation(deltaLog, opType = "DELETE.findTouchedFiles") {
condition: Expression,
opName: String): Seq[TouchedFileWithDV] = {
require(
SUPPORTED_DML_COMMANDS.contains(opName),
s"Expecting opName to be one of ${SUPPORTED_DML_COMMANDS.mkString(", ")}, " +
s"but got '$opName'.")

recordDeltaOperation(deltaLog, opType = s"$opName.findTouchedFiles") {
val candidateFiles = fileIndex match {
case f: TahoeBatchFileIndex => f.addFiles
case _ => throw new IllegalArgumentException("Unexpected file index found!")
Expand Down Expand Up @@ -165,7 +173,7 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
spark: SparkSession,
touchedFiles: Seq[TouchedFileWithDV],
snapshot: Snapshot): (Seq[FileAction], Map[String, Long]) = {
val numDeletedRows: Long = touchedFiles.map(_.numberOfModifiedRows).sum
val numModifiedRows: Long = touchedFiles.map(_.numberOfModifiedRows).sum
val numRemovedFiles: Long = touchedFiles.count(_.isFullyReplaced())

val (fullyRemovedFiles, notFullyRemovedFiles) = touchedFiles.partition(_.isFullyReplaced())
Expand All @@ -192,7 +200,7 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand {
}
numDeletionVectorsRemoved += fullyRemoved.count(_.deletionVector != null)
val metricMap = Map(
"numDeletedRows" -> numDeletedRows,
"numModifiedRows" -> numModifiedRows,
"numRemovedFiles" -> numRemovedFiles,
"numDeletionVectorsAdded" -> numDeletionVectorsAdded,
"numDeletionVectorsRemoved" -> numDeletionVectorsRemoved,
Expand Down Expand Up @@ -485,8 +493,8 @@ object DeletionVectorData {
}

/** Final output for each file containing the file path, DeletionVectorDescriptor and how many
* rows are marked as deleted in this file as part of the this DELETE (doesn't include already
* rows marked as deleted)
* rows are marked as deleted in this file as part of the this operation (doesn't include rows that
* are already marked as deleted).
*
* @param filePath Absolute path of the data file this DV result is generated for.
* @param deletionVector Deletion vector generated containing the newly deleted row indices from
Expand Down Expand Up @@ -643,7 +651,7 @@ object DeletionVectorWriter extends DeltaLogging {
}

/**
* Prepares a mapper function that can be used by DELETE command to store the Deletion Vectors
* Prepares a mapper function that can be used by DML commands to store the Deletion Vectors
* that are in described in [[DeletionVectorData]] and return their descriptors
* [[DeletionVectorResult]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.delta.commands

import java.util.concurrent.TimeUnit

import org.apache.spark.sql.delta.metric.IncrementMetric
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, FileAction}
Expand Down Expand Up @@ -253,7 +255,7 @@ case class DeleteCommand(
val fileIndex = new TahoeBatchFileIndex(
sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot)
if (shouldWriteDVs) {
val targetDf = DeleteWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
val targetDf = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
sparkSession,
target,
fileIndex)
Expand All @@ -262,21 +264,22 @@ case class DeleteCommand(
// with deletion vectors.
val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)

val touchedFiles = DeleteWithDeletionVectorsHelper.findTouchedFiles(
val touchedFiles = DMLWithDeletionVectorsHelper.findTouchedFiles(
sparkSession,
txn,
mustReadDeletionVectors,
deltaLog,
targetDf,
fileIndex,
cond)
cond,
opName = "DELETE")

if (touchedFiles.nonEmpty) {
val (actions, metricMap) = DeleteWithDeletionVectorsHelper.processUnmodifiedData(
val (actions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData(
sparkSession,
touchedFiles,
txn.snapshot)
metrics("numDeletedRows").set(metricMap("numDeletedRows"))
metrics("numDeletedRows").set(metricMap("numModifiedRows"))
numDeletionVectorsAdded = metricMap("numDeletionVectorsAdded")
numDeletionVectorsRemoved = metricMap("numDeletionVectorsRemoved")
numDeletionVectorsUpdated = metricMap("numDeletionVectorsUpdated")
Expand Down Expand Up @@ -342,7 +345,8 @@ case class DeleteCommand(
}
numAddedChangeFiles = changeFiles.size
changeFileBytes = changeFiles.collect { case f: AddCDCFile => f.size }.sum
rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs
rewriteTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) - scanTimeMs
numDeletedRows = Some(metrics("numDeletedRows").value)
numCopiedRows =
Some(metrics("numTouchedRows").value - metrics("numDeletedRows").value)
Expand Down
Loading

0 comments on commit 0a0ea97

Please # to comment.