diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteWithDeletionVectorsHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala similarity index 96% rename from spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteWithDeletionVectorsHelper.scala rename to spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala index aed7d7dcba0..9dd175a1350 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteWithDeletionVectorsHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.paths.SparkPath -import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, FileSourceMetadataAttribute, GenericInternalRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation} @@ -47,11 +47,13 @@ import org.apache.spark.util.{SerializableConfiguration, Utils => SparkUtils} /** - * Contains utility classes and method to delete rows in a table using the Deletion Vectors. + * Contains utility classes and method for performing DML operations with Deletion Vectors. */ -object DeleteWithDeletionVectorsHelper extends DeltaCommand { +object DMLWithDeletionVectorsHelper extends DeltaCommand { + val SUPPORTED_DML_COMMANDS: Seq[String] = Seq("DELETE", "UPDATE") + /** - * Creates a DataFrame that can be used to scan for rows matching DELETE condition in given + * Creates a DataFrame that can be used to scan for rows matching the condition in the given * files. Generally the given file list is a pruned file list using the stats based pruning. */ def createTargetDfForScanningForMatches( @@ -114,8 +116,14 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand { deltaLog: DeltaLog, targetDf: DataFrame, fileIndex: TahoeFileIndex, - condition: Expression): Seq[TouchedFileWithDV] = { - recordDeltaOperation(deltaLog, opType = "DELETE.findTouchedFiles") { + condition: Expression, + opName: String): Seq[TouchedFileWithDV] = { + require( + SUPPORTED_DML_COMMANDS.contains(opName), + s"Expecting opName to be one of ${SUPPORTED_DML_COMMANDS.mkString(", ")}, " + + s"but got '$opName'.") + + recordDeltaOperation(deltaLog, opType = s"$opName.findTouchedFiles") { val candidateFiles = fileIndex match { case f: TahoeBatchFileIndex => f.addFiles case _ => throw new IllegalArgumentException("Unexpected file index found!") @@ -165,7 +173,7 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand { spark: SparkSession, touchedFiles: Seq[TouchedFileWithDV], snapshot: Snapshot): (Seq[FileAction], Map[String, Long]) = { - val numDeletedRows: Long = touchedFiles.map(_.numberOfModifiedRows).sum + val numModifiedRows: Long = touchedFiles.map(_.numberOfModifiedRows).sum val numRemovedFiles: Long = touchedFiles.count(_.isFullyReplaced()) val (fullyRemovedFiles, notFullyRemovedFiles) = touchedFiles.partition(_.isFullyReplaced()) @@ -192,7 +200,7 @@ object DeleteWithDeletionVectorsHelper extends DeltaCommand { } numDeletionVectorsRemoved += fullyRemoved.count(_.deletionVector != null) val metricMap = Map( - "numDeletedRows" -> numDeletedRows, + "numModifiedRows" -> numModifiedRows, "numRemovedFiles" -> numRemovedFiles, "numDeletionVectorsAdded" -> numDeletionVectorsAdded, "numDeletionVectorsRemoved" -> numDeletionVectorsRemoved, @@ -485,8 +493,8 @@ object DeletionVectorData { } /** Final output for each file containing the file path, DeletionVectorDescriptor and how many - * rows are marked as deleted in this file as part of the this DELETE (doesn't include already - * rows marked as deleted) + * rows are marked as deleted in this file as part of the this operation (doesn't include rows that + * are already marked as deleted). * * @param filePath Absolute path of the data file this DV result is generated for. * @param deletionVector Deletion vector generated containing the newly deleted row indices from @@ -643,7 +651,7 @@ object DeletionVectorWriter extends DeltaLogging { } /** - * Prepares a mapper function that can be used by DELETE command to store the Deletion Vectors + * Prepares a mapper function that can be used by DML commands to store the Deletion Vectors * that are in described in [[DeletionVectorData]] and return their descriptors * [[DeletionVectorResult]]. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala index 18bdac77904..98c11f5a5c2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.commands +import java.util.concurrent.TimeUnit + import org.apache.spark.sql.delta.metric.IncrementMetric import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, FileAction} @@ -253,7 +255,7 @@ case class DeleteCommand( val fileIndex = new TahoeBatchFileIndex( sparkSession, "delete", candidateFiles, deltaLog, deltaLog.dataPath, txn.snapshot) if (shouldWriteDVs) { - val targetDf = DeleteWithDeletionVectorsHelper.createTargetDfForScanningForMatches( + val targetDf = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches( sparkSession, target, fileIndex) @@ -262,21 +264,22 @@ case class DeleteCommand( // with deletion vectors. val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot) - val touchedFiles = DeleteWithDeletionVectorsHelper.findTouchedFiles( + val touchedFiles = DMLWithDeletionVectorsHelper.findTouchedFiles( sparkSession, txn, mustReadDeletionVectors, deltaLog, targetDf, fileIndex, - cond) + cond, + opName = "DELETE") if (touchedFiles.nonEmpty) { - val (actions, metricMap) = DeleteWithDeletionVectorsHelper.processUnmodifiedData( + val (actions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData( sparkSession, touchedFiles, txn.snapshot) - metrics("numDeletedRows").set(metricMap("numDeletedRows")) + metrics("numDeletedRows").set(metricMap("numModifiedRows")) numDeletionVectorsAdded = metricMap("numDeletionVectorsAdded") numDeletionVectorsRemoved = metricMap("numDeletionVectorsRemoved") numDeletionVectorsUpdated = metricMap("numDeletionVectorsUpdated") @@ -342,7 +345,8 @@ case class DeleteCommand( } numAddedChangeFiles = changeFiles.size changeFileBytes = changeFiles.collect { case f: AddCDCFile => f.size }.sum - rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs + rewriteTimeMs = + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) - scanTimeMs numDeletedRows = Some(metrics("numDeletedRows").value) numCopiedRows = Some(metrics("numTouchedRows").value - metrics("numDeletedRows").value) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala index 35ad600c191..282910b5449 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.delta.commands // scalastyle:off import.ordering.noEmptyLine +import java.util.concurrent.TimeUnit + import org.apache.spark.sql.delta.metric.IncrementMetric import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaOperations, DeltaTableUtils, OptimisticTransaction} import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction} import org.apache.spark.sql.delta.commands.cdc.CDCReader.{CDC_TYPE_COLUMN_NAME, CDC_TYPE_NOT_CDC, CDC_TYPE_UPDATE_POSTIMAGE, CDC_TYPE_UPDATE_PREIMAGE} import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext @@ -119,12 +122,18 @@ case class UpdateCommand( val (metadataPredicates, dataPredicates) = DeltaTableUtils.splitMetadataAndDataPredicates( updateCondition, txn.metadata.partitionColumns, sparkSession) - val candidateFiles = txn.filterFiles(metadataPredicates ++ dataPredicates) + + // Should we write the DVs to represent updated rows? + val shouldWriteDeletionVectors = shouldWritePersistentDeletionVectors(sparkSession, txn) + val candidateFiles = txn.filterFiles( + metadataPredicates ++ dataPredicates, + keepNumRecords = shouldWriteDeletionVectors) + val nameToAddFile = generateCandidateFileMap(deltaLog.dataPath, candidateFiles) - scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 + scanTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) - val filesToRewrite: Seq[AddFile] = if (candidateFiles.isEmpty) { + val filesToRewrite: Seq[TouchedFileWithDV] = if (candidateFiles.isEmpty) { // Case 1: Do nothing if no row qualifies the partition predicates // that are part of Update condition Nil @@ -132,70 +141,125 @@ case class UpdateCommand( // Case 2: Update all the rows from the files that are in the specified partitions // when the data filter is empty candidateFiles + .map(f => TouchedFileWithDV(f.path, f, newDeletionVector = null, deletedRows = 0L)) } else { // Case 3: Find all the affected files using the user-specified condition val fileIndex = new TahoeBatchFileIndex( sparkSession, "update", candidateFiles, deltaLog, tahoeFileIndex.path, txn.snapshot) - // Keep everything from the resolved target except a new TahoeFileIndex - // that only involves the affected files instead of all files. - val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex) - val data = Dataset.ofRows(sparkSession, newTarget) - val incrUpdatedCountExpr = IncrementMetric(TrueLiteral, metrics("numUpdatedRows")) - val pathsToRewrite = - withStatusCode("DELTA", UpdateCommand.FINDING_TOUCHED_FILES_MSG) { - data.filter(new Column(updateCondition)) - .select(input_file_name()) - .filter(new Column(incrUpdatedCountExpr)) - .distinct() - .as[String] - .collect() - } - - scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - pathsToRewrite.map(getTouchedFile(deltaLog.dataPath, _, nameToAddFile)).toSeq + val touchedFilesWithDV = if (shouldWriteDeletionVectors) { + // Case 3.1: Find all the affected files via DV path + val targetDf = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches( + sparkSession, + target, + fileIndex) + + // Does the target table already has DVs enabled? If so, we need to read the table + // with deletion vectors. + val mustReadDeletionVectors = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot) + + DMLWithDeletionVectorsHelper.findTouchedFiles( + sparkSession, + txn, + mustReadDeletionVectors, + deltaLog, + targetDf, + fileIndex, + updateCondition, + opName = "UPDATE") + } else { + // Case 3.2: Find all the affected files using the non-DV path + // Keep everything from the resolved target except a new TahoeFileIndex + // that only involves the affected files instead of all files. + val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex) + val data = Dataset.ofRows(sparkSession, newTarget) + val incrUpdatedCountExpr = IncrementMetric(TrueLiteral, metrics("numUpdatedRows")) + val pathsToRewrite = + withStatusCode("DELTA", UpdateCommand.FINDING_TOUCHED_FILES_MSG) { + data.filter(new Column(updateCondition)) + .select(input_file_name()) + .filter(new Column(incrUpdatedCountExpr)) + .distinct() + .as[String] + .collect() + } + + // Wrap AddFile into TouchedFileWithDV that has empty DV. + pathsToRewrite + .map(getTouchedFile(deltaLog.dataPath, _, nameToAddFile)) + .map(f => TouchedFileWithDV(f.path, f, newDeletionVector = null, deletedRows = 0L)) + .toSeq + } + // Refresh scan time for Case 3, since we performed scan here. + scanTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) + touchedFilesWithDV } - numTouchedFiles = filesToRewrite.length - - val newActions = if (filesToRewrite.isEmpty) { - // Do nothing if no row qualifies the UPDATE condition - Nil - } else { - // Generate the new files containing the updated values - withStatusCode("DELTA", UpdateCommand.rewritingFilesMsg(filesToRewrite.size)) { - rewriteFiles(sparkSession, txn, tahoeFileIndex.path, - filesToRewrite.map(_.path), nameToAddFile, updateCondition) + val totalActions = { + // When DV is on, we first mask removed rows with DVs and generate (remove, add) pairs. + val actionsForExistingFiles = if (shouldWriteDeletionVectors) { + // When there's no data predicate, all matched files are removed. + if (dataPredicates.isEmpty) { + val operationTimestamp = System.currentTimeMillis() + filesToRewrite.map(_.fileLogEntry.removeWithTimestamp(operationTimestamp)) + } else { + // When there is data predicate, we generate (remove, add) pairs. + val filesToRewriteWithDV = filesToRewrite.filter(_.newDeletionVector != null) + val (dvActions, metricMap) = DMLWithDeletionVectorsHelper.processUnmodifiedData( + sparkSession, + filesToRewriteWithDV, + txn.snapshot) + metrics("numUpdatedRows").set(metricMap("numModifiedRows")) + numTouchedFiles = metricMap("numRemovedFiles") + dvActions + } + } else { + // Without DV we'll leave the job to `rewriteFiles`. + Nil } - } - rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs + // When DV is on, we write out updated rows only. The return value will be only `add` actions. + // When DV is off, we write out updated rows plus unmodified rows from the same file, then + // return `add` and `remove` actions. + val rewriteStartNs = System.nanoTime() + val actionsForNewFiles = + withStatusCode("DELTA", UpdateCommand.rewritingFilesMsg(filesToRewrite.size)) { + if (filesToRewrite.nonEmpty) { + rewriteFiles( + sparkSession, + txn, + rootPath = tahoeFileIndex.path, + inputLeafFiles = filesToRewrite.map(_.fileLogEntry), + nameToAddFileMap = nameToAddFile, + condition = updateCondition, + generateRemoveFileActions = !shouldWriteDeletionVectors, + copyUnmodifiedRows = !shouldWriteDeletionVectors) + } else { + Nil + } + } + rewriteTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - rewriteStartNs) - val (changeActions, addActions) = newActions.partition(_.isInstanceOf[AddCDCFile]) - numRewrittenFiles = addActions.size - numAddedBytes = addActions.map(_.getFileSize).sum - numAddedChangeFiles = changeActions.size - changeFileBytes = changeActions.collect { case f: AddCDCFile => f.size }.sum + numTouchedFiles = filesToRewrite.length + val (addActions, removeActions) = actionsForNewFiles.partition(_.isInstanceOf[AddFile]) + numRewrittenFiles = addActions.size + numAddedBytes = addActions.map(_.getFileSize).sum + numRemovedBytes = removeActions.map(_.getFileSize).sum - val totalActions = if (filesToRewrite.isEmpty) { - // Do nothing if no row qualifies the UPDATE condition - Nil - } else { - // Delete the old files and return those delete actions along with the new AddFile actions for - // files containing the updated values - val operationTimestamp = System.currentTimeMillis() - val deleteActions = filesToRewrite.map(_.removeWithTimestamp(operationTimestamp)) - numRemovedBytes = filesToRewrite.map(_.getFileSize).sum - deleteActions ++ newActions + actionsForExistingFiles ++ actionsForNewFiles } + val changeActions = totalActions.collect { case f: AddCDCFile => f } + numAddedChangeFiles = changeActions.size + changeFileBytes = changeActions.map(_.size).sum + metrics("numAddedFiles").set(numRewrittenFiles) metrics("numAddedBytes").set(numAddedBytes) metrics("numAddedChangeFiles").set(numAddedChangeFiles) metrics("changeFileBytes").set(changeFileBytes) metrics("numRemovedFiles").set(numTouchedFiles) metrics("numRemovedBytes").set(numRemovedBytes) - metrics("executionTimeMs").set((System.nanoTime() - startTime) / 1000 / 1000) + metrics("executionTimeMs").set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) metrics("scanTimeMs").set(scanTimeMs) metrics("rewriteTimeMs").set(rewriteTimeMs) // In the case where the numUpdatedRows is not captured, we can siphon out the metrics from @@ -243,35 +307,60 @@ case class UpdateCommand( * When CDF is enabled, includes the generation of CDC preimage and postimage columns for * changed rows. * - * @return the list of [[AddFile]]s and [[AddCDCFile]]s that have been written. + * @return a list of [[FileAction]]s, consisting of newly-written data and CDC files and old + * files that have been removed. */ private def rewriteFiles( spark: SparkSession, txn: OptimisticTransaction, rootPath: Path, - inputLeafFiles: Seq[String], + inputLeafFiles: Seq[AddFile], nameToAddFileMap: Map[String, AddFile], - condition: Expression): Seq[FileAction] = { + condition: Expression, + generateRemoveFileActions: Boolean, + copyUnmodifiedRows: Boolean): Seq[FileAction] = { + // Number of total rows that we have seen, i.e. are either copying or updating (sum of both). + // This will be used later, along with numUpdatedRows, to determine numCopiedRows. + val incrTouchedCountExpr = IncrementMetric(TrueLiteral, metrics("numTouchedRows")) + // Containing the map from the relative file path to AddFile val baseRelation = buildBaseRelation( - spark, txn, "update", rootPath, inputLeafFiles, nameToAddFileMap) + spark, txn, "update", rootPath, inputLeafFiles.map(_.path), nameToAddFileMap) val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location) val targetDf = Dataset.ofRows(spark, newTarget) - - // Number of total rows that we have seen, i.e. are either copying or updating (sum of both). - // This will be used later, along with numUpdatedRows, to determine numCopiedRows. - val incrTouchedCountExpr = IncrementMetric(TrueLiteral, metrics("numTouchedRows")) + val targetDfWithEvaluatedCondition = { + val evalDf = targetDf.withColumn(UpdateCommand.CONDITION_COLUMN_NAME, new Column(condition)) + val copyAndUpdateRowsDf = if (copyUnmodifiedRows) { + evalDf + } else { + evalDf.filter(new Column(UpdateCommand.CONDITION_COLUMN_NAME)) + } + copyAndUpdateRowsDf.filter(new Column(incrTouchedCountExpr)) + } val updatedDataFrame = UpdateCommand.withUpdatedColumns( target.output, updateExpressions, condition, - targetDf - .filter(new Column(incrTouchedCountExpr)) - .withColumn(UpdateCommand.CONDITION_COLUMN_NAME, new Column(condition)), + targetDfWithEvaluatedCondition, UpdateCommand.shouldOutputCdc(txn)) - txn.writeFiles(updatedDataFrame) + val addFiles = txn.writeFiles(updatedDataFrame) + + val removeFiles = if (generateRemoveFileActions) { + val operationTimestamp = System.currentTimeMillis() + inputLeafFiles.map(_.removeWithTimestamp(operationTimestamp)) + } else { + Nil + } + + addFiles ++ removeFiles + } + + def shouldWritePersistentDeletionVectors( + spark: SparkSession, txn: OptimisticTransaction): Boolean = { + spark.conf.get(DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS) && + DeletionVectorUtils.deletionVectorsWritable(txn.snapshot) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 830a7bc22c8..879d7561b61 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1244,6 +1244,13 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val UPDATE_USE_PERSISTENT_DELETION_VECTORS = + buildConf("update.deletionVectors.persistent") + .internal() + .doc("Enable persistent Deletion Vectors in the Update command.") + .booleanConf + .createWithDefault(false) + val DELETION_VECTOR_PACKING_TARGET_SIZE = buildConf("deletionVectors.packing.targetSize") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala index bcdfd096563..ca1c76e555f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala @@ -104,7 +104,7 @@ class DeleteSQLWithDeletionVectorsSuite extends DeleteSQLSuite with DeletionVectorsTestUtils { override def beforeAll(): Unit = { super.beforeAll() - enableDeletionVectorsForDeletes(spark) + enableDeletionVectors(spark, delete = true) } override def excluded: Seq[String] = super.excluded ++ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala index ad19165ce45..9c02b4fd0f4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala @@ -35,13 +35,20 @@ import org.apache.spark.sql.test.SharedSparkSession /** Collection of test utilities related with persistent Deletion Vectors. */ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession { - def enableDeletionVectorsForDeletes(spark: SparkSession, enabled: Boolean = true): Unit = { - val enabledStr = enabled.toString + def enableDeletionVectors( + spark: SparkSession, + delete: Boolean = false, + update: Boolean = false): Unit = { + val global = delete | update spark.conf - .set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, enabledStr) - spark.conf.set(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key, enabledStr) + .set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, global.toString) + spark.conf.set(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key, delete.toString) + spark.conf.set(DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS.key, update.toString) } + def enableDeletionVectorsForAllSupportedOperations(spark: SparkSession): Unit = + enableDeletionVectors(spark, delete = true, update = true) + def testWithDVs(testName: String, testTags: org.scalatest.Tag*)(thunk: => Unit): Unit = { test(testName, testTags : _*) { withDeletionVectorsEnabled() { @@ -55,7 +62,8 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession { val enabledStr = enabled.toString withSQLConf( DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> enabledStr, - DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr) { + DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr, + DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr) { thunk } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala index ca5bd515eb3..043fe16f8cd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala @@ -1047,7 +1047,7 @@ class DeltaCDCStreamDeletionVectorSuite extends DeltaCDCStreamSuite with DeletionVectorsTestUtils { override def beforeAll(): Unit = { super.beforeAll() - enableDeletionVectorsForDeletes(spark) + enableDeletionVectorsForAllSupportedOperations(spark) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala index 6c9403b266c..758fa73ab7d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala @@ -996,6 +996,6 @@ class DeltaCDCScalaWithDeletionVectorsSuite extends DeltaCDCScalaSuite with DeletionVectorsTestUtils { override def beforeAll(): Unit = { super.beforeAll() - enableDeletionVectorsForDeletes(spark) + enableDeletionVectorsForAllSupportedOperations(spark) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/UpdateSQLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/UpdateSQLSuite.scala index 2fbb62596c5..260899ccf9c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/UpdateSQLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/UpdateSQLSuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile} import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.test.{DeltaExcludedTestMixin, DeltaSQLCommandTest} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLType +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy @@ -158,3 +160,149 @@ class UpdateSQLSuite extends UpdateSuiteBase sql(s"UPDATE $target SET $set $whereClause") } } + +class UpdateSQLWithDeletionVectorsSuite extends UpdateSQLSuite + with DeltaExcludedTestMixin + with DeletionVectorsTestUtils { + override def beforeAll(): Unit = { + super.beforeAll() + enableDeletionVectors(spark, update = true) + } + + override def excluded: Seq[String] = super.excluded ++ + Seq( + // The following two tests must fail when DV is used. Covered by another test case: + // "throw error when non-pinned TahoeFileIndex snapshot is used". + "data and partition predicates - Partition=true Skipping=false", + "data and partition predicates - Partition=false Skipping=false", + // The scan schema contains additional row index filter columns. + "schema pruning on finding files to update", + "nested schema pruning on finding files to update" + ) + + test("repeated UPDATE produces deletion vectors") { + withTempDir { dir => + val path = dir.getCanonicalPath + val log = DeltaLog.forTable(spark, path) + spark.range(0, 10, 1, numPartitions = 2).write.format("delta").save(path) + + // scalastyle:off argcount + def updateAndCheckLog( + where: String, + expectedAnswer: Seq[Row], + + numAddFilesWithDVs: Int, + sumNumRowsInAddFileWithDV: Int, + sumNumRowsInAddFileWithoutDV: Int, + sumDvCardinalityInAddFile: Long, + + numRemoveFilesWithDVs: Int, + sumNumRowsInRemoveFileWithDV: Int, + sumNumRowsInRemoveFileWithoutDV: Int, + sumDvCardinalityInRemoveFile: Long): Unit = { + executeUpdate(s"delta.`$path`", "id = -1", where) + checkAnswer(sql(s"SELECT * FROM delta.`$path`"), expectedAnswer) + + val fileActions = log.getChanges(log.update().version).flatMap(_._2) + .collect { case f: FileAction => f } + .toSeq + val addFiles = fileActions.collect { case f: AddFile => f } + val removeFiles = fileActions.collect { case f: RemoveFile => f } + + val (addFilesWithDV, addFilesWithoutDV) = addFiles.partition(_.deletionVector != null) + assert(addFilesWithDV.size === numAddFilesWithDVs) + assert( + addFilesWithDV.map(_.numPhysicalRecords.getOrElse(0L)).sum === + sumNumRowsInAddFileWithDV) + assert( + addFilesWithDV.map(_.deletionVector.cardinality).sum === + sumDvCardinalityInAddFile) + assert( + addFilesWithoutDV.map(_.numPhysicalRecords.getOrElse(0L)).sum === + sumNumRowsInAddFileWithoutDV) + + val (removeFilesWithDV, removeFilesWithoutDV) = + removeFiles.partition(_.deletionVector != null) + assert(removeFilesWithDV.size === numRemoveFilesWithDVs) + assert( + removeFilesWithDV.map(_.numPhysicalRecords.getOrElse(0L)).sum === + sumNumRowsInRemoveFileWithDV) + assert( + removeFilesWithDV.map(_.deletionVector.cardinality).sum === + sumDvCardinalityInRemoveFile) + assert( + removeFilesWithoutDV.map(_.numPhysicalRecords.getOrElse(0L)).sum === + sumNumRowsInRemoveFileWithoutDV) + } + // scalastyle:on argcount + + // DV created. 4 rows updated. + updateAndCheckLog( + "id % 3 = 0", + Seq(-1, 1, 2, -1, 4, 5, -1, 7, 8, -1).map(Row(_)), + numAddFilesWithDVs = 2, + sumNumRowsInAddFileWithDV = 10, + sumNumRowsInAddFileWithoutDV = 4, + sumDvCardinalityInAddFile = 4, + + numRemoveFilesWithDVs = 0, + sumNumRowsInRemoveFileWithDV = 0, + sumNumRowsInRemoveFileWithoutDV = 10, + sumDvCardinalityInRemoveFile = 0) + + // DV updated. 2 rows from the original file updated. + updateAndCheckLog( + "id % 4 = 0", + Seq(-1, 1, 2, -1, -1, 5, -1, 7, -1, -1).map(Row(_)), + numAddFilesWithDVs = 2, + sumNumRowsInAddFileWithDV = 10, + sumNumRowsInAddFileWithoutDV = 2, + sumDvCardinalityInAddFile = 6, + numRemoveFilesWithDVs = 2, + sumNumRowsInRemoveFileWithDV = 10, + sumNumRowsInRemoveFileWithoutDV = 0, + sumDvCardinalityInRemoveFile = 4) + + // Original files DV removed, because all rows in the SECOND FILE are deleted. + updateAndCheckLog( + "id IN (5, 7)", + Seq(-1, 1, 2, -1, -1, -1, -1, -1, -1, -1).map(Row(_)), + numAddFilesWithDVs = 0, + sumNumRowsInAddFileWithDV = 0, + sumNumRowsInAddFileWithoutDV = 2, + sumDvCardinalityInAddFile = 0, + numRemoveFilesWithDVs = 1, + sumNumRowsInRemoveFileWithDV = 5, + sumNumRowsInRemoveFileWithoutDV = 0, + sumDvCardinalityInRemoveFile = 3) + } + } + + test("UPDATE a whole partition do not produce DVs") { + withTempDir { dir => + val path = dir.getCanonicalPath + val log = DeltaLog.forTable(spark, path) + spark.range(10).withColumn("part", col("id") % 2) + .write + .format("delta") + .partitionBy("part") + .save(path) + + executeUpdate(s"delta.`$path`", "id = -1", where = "part = 0") + checkAnswer( + sql(s"SELECT * FROM delta.`$path`"), + Row(-1, 0) :: Row(1, 1) :: Row(-1, 0) :: + Row(3, 1) :: Row(-1, 0) :: Row(5, 1) :: Row(-1, 0) :: + Row(7, 1) :: Row(-1, 0) :: Row(9, 1) :: Nil) + + val fileActions = log.getChanges(log.update().version).flatMap(_._2) + .collect { case f: FileAction => f } + .toSeq + val addFiles = fileActions.collect { case f: AddFile => f } + val removeFiles = fileActions.collect { case f: RemoveFile => f } + assert(addFiles.map(_.numPhysicalRecords.getOrElse(0L)).sum === 5) + assert(removeFiles.map(_.numPhysicalRecords.getOrElse(0L)).sum === 5) + for (a <- addFiles) assert(a.deletionVector === null) + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/cdc/UpdateCDCSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/cdc/UpdateCDCSuite.scala index 9e88251d0cb..d2767e2a0f4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/cdc/UpdateCDCSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/cdc/UpdateCDCSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.delta.cdc // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, RemoveFile} import org.apache.spark.sql.delta.commands.cdc.CDCReader +import org.apache.spark.sql.delta.test.DeltaExcludedTestMixin import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.SparkConf @@ -184,3 +186,54 @@ class UpdateCDCSuite extends UpdateSQLSuite with DeltaColumnMappingTestUtils { } } +class UpdateCDCWithDeletionVectorsSuite extends UpdateCDCSuite + with DeltaExcludedTestMixin + with DeletionVectorsTestUtils { + override def beforeAll(): Unit = { + super.beforeAll() + enableDeletionVectors(spark, update = true) + } + + override def excluded: Seq[String] = super.excluded ++ + Seq( + // The following two tests must fail when DV is used. Covered by another test case: + // "throw error when non-pinned TahoeFileIndex snapshot is used". + "data and partition predicates - Partition=true Skipping=false", + "data and partition predicates - Partition=false Skipping=false", + // The scan schema contains additional row index filter columns. + "schema pruning on finding files to update", + "nested schema pruning on finding files to update" + ) + + test("UPDATE with DV write CDC files explicitly") { + withTempDir { dir => + val path = dir.getCanonicalPath + val log = DeltaLog.forTable(spark, path) + spark.range(0, 10, 1, numPartitions = 2).write.format("delta").save(path) + executeUpdate(s"delta.`$path`", "id = -1", "id % 4 = 0") + + val latestVersion = log.update().version + checkAnswer( + CDCReader + .changesToBatchDF(log, latestVersion, latestVersion, spark) + .drop(CDCReader.CDC_COMMIT_TIMESTAMP), + Row(0, "update_preimage", latestVersion) :: + Row(-1, "update_postimage", latestVersion) :: + Row(4, "update_preimage", latestVersion) :: + Row(-1, "update_postimage", latestVersion) :: + Row(8, "update_preimage", latestVersion) :: + Row(-1, "update_postimage", latestVersion) :: + Nil) + + val allActions = log.getChanges(latestVersion).flatMap(_._2).toSeq + val addActions = allActions.collect { case f: AddFile => f } + val removeActions = allActions.collect { case f: RemoveFile => f } + val cdcActions = allActions.collect { case f: AddCDCFile => f } + + assert(addActions.count(_.deletionVector != null) === 2) + assert(removeActions.size === 2) + assert(cdcActions.nonEmpty) + } + } +} +