Skip to content

Commit

Permalink
Populate metrics when running DELETE on a partitioned column/field
Browse files Browse the repository at this point in the history
Export numDeletedRows status in DeleteComand's Case 1 and Case 2, to enable the numRowsAffected showing up when user running the delete query on partition key and whole table.

GitOrigin-RevId: f5af1c495a49d344a73c405c7d8582c7a2ed7160
  • Loading branch information
carlfu-db authored and vkorukanti committed Oct 26, 2022
1 parent 7e9ec8d commit 2118e64
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -489,19 +489,27 @@ private[delta] object DeltaOperationMetrics {
"numOutputBytes", // size in bytes of the written contents
"numOutputRows", // number of rows written
"numAddedChangeFiles", // number of CDC files
"numRemovedFiles" // number of files removed
"numRemovedFiles", // number of files removed
// Records below only exist when DELTA_DML_METRICS_FROM_METADATA is enabled
"numCopiedRows", // number of rows copied
"numDeletedRows" // number of rows deleted
)

/**
* Deleting the entire table or partition would prevent row level metrics from being recorded.
* This is used only in test to verify specific delete cases.
* Deleting the entire table or partition will record row level metrics when
* DELTA_DML_METRICS_FROM_METADATA is enabled
* * DELETE_PARTITIONS is used only in test to verify specific delete cases.
*/
val DELETE_PARTITIONS = Set(
"numRemovedFiles", // number of files removed
"numAddedChangeFiles", // number of CDC files generated - generally 0 in this case
"executionTimeMs", // time taken to execute the entire operation
"scanTimeMs", // time taken to scan the files for matches
"rewriteTimeMs" // time taken to rewrite the matched files
"rewriteTimeMs", // time taken to rewrite the matched files
// Records below only exist when DELTA_DML_METRICS_FROM_METADATA is enabled
"numCopiedRows", // number of rows copied
"numDeletedRows", // number of rows deleted
"numAddedFiles" // number of files added
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,19 @@ trait OptimisticTransactionImpl extends TransactionalWrite
scan.files
}

/** Same as filterFiles but makes sure that the stats contain at least the numRecords field. */
def filterFilesWithNumRecords(filters: Seq[Expression]): Seq[AddFile] = {
val scan = snapshot.filesForScan(
filters = filters,
keepNumRecords = true)
val partitionFilters = filters.filter { f =>
DeltaTableUtils.isPredicatePartitionColumnsOnly(f, metadata.partitionColumns, spark)
}
readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true))
readFiles ++= scan.files
scan.files
}

/** Returns files within the given partitions. */
def filterFiles(partitions: Set[Map[String, String]]): Seq[AddFile] = {
import org.apache.spark.sql.functions.{array, col}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, FileAction}
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, FileAction}
import org.apache.spark.sql.delta.commands.DeleteCommand.{rewritingFilesMsg, FINDING_TOUCHED_FILES_MSG}
import org.apache.spark.sql.delta.commands.MergeIntoCommand.totalBytesAndDistinctPartitionValues
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import com.fasterxml.jackson.databind.annotation.JsonDeserialize

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -63,6 +64,26 @@ trait DeleteCommandMetrics { self: LeafRunnableCommand =>
"changeFileBytes" -> createMetric(sc, "total size of change data capture files generated"),
"numTouchedRows" -> createMetric(sc, "number of rows touched")
)

def getDeletedRowsFromAddFilesAndUpdateMetrics(files: Seq[AddFile]) : Option[Long] = {
if (!conf.getConf(DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA)) {
return None;
}
// No file to get metadata, return none to be consistent with metadata stats disabled
if (files.isEmpty) {
return None
}
// Return None if any file does not contain numLogicalRecords status
var count: Long = 0
for (file <- files) {
if (file.numLogicalRecords.isEmpty) {
return None
}
count += file.numLogicalRecords.get
}
metrics("numDeletedRows").set(count)
return Some(count)
}
}

/**
Expand Down Expand Up @@ -142,7 +163,12 @@ case class DeleteCommand(
val deleteActions: Seq[Action] = condition match {
case None =>
// Case 1: Delete the whole table if the condition is true
val allFiles = txn.filterFiles(Nil)
val allFiles =
if (conf.getConf(DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA)) {
txn.filterFilesWithNumRecords(Nil)
} else {
txn.filterFiles(Nil)
}

numRemovedFiles = allFiles.size
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
Expand All @@ -152,6 +178,8 @@ case class DeleteCommand(
numBytesBeforeSkipping = numBytes
numFilesAfterSkipping = numRemovedFiles
numBytesAfterSkipping = numBytes
numDeletedRows = getDeletedRowsFromAddFilesAndUpdateMetrics(allFiles)

if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsAfterSkipping = Some(numPartitions)
numPartitionsRemovedFrom = Some(numPartitions)
Expand All @@ -171,7 +199,12 @@ case class DeleteCommand(
// Case 2: The condition can be evaluated using metadata only.
// Delete a set of files without the need of scanning any data files.
val operationTimestamp = System.currentTimeMillis()
val candidateFiles = txn.filterFiles(metadataPredicates)
val candidateFiles =
if (conf.getConf(DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA)) {
txn.filterFilesWithNumRecords(metadataPredicates)
} else {
txn.filterFiles(metadataPredicates)
}

scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
numRemovedFiles = candidateFiles.size
Expand All @@ -180,6 +213,8 @@ case class DeleteCommand(
val (numCandidateBytes, numCandidatePartitions) =
totalBytesAndDistinctPartitionValues(candidateFiles)
numBytesAfterSkipping = numCandidateBytes
numDeletedRows = getDeletedRowsFromAddFilesAndUpdateMetrics(candidateFiles)

if (txn.metadata.partitionColumns.nonEmpty) {
numPartitionsAfterSkipping = Some(numCandidatePartitions)
numPartitionsRemovedFrom = Some(numCandidatePartitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_DML_METRICS_FROM_METADATA =
buildConf("dmlMetricsFromMetadata.enabled")
.internal()
.doc(
""" When enabled, metadata only Delete, ReplaceWhere and Truncate operations will report row
| level operation metrics by reading the file statistics for number of rows.
| """.stripMargin)
.booleanConf
.createWithDefault(true)

val DELTA_COLLECT_STATS_USING_TABLE_SCHEMA =
buildConf("stats.collect.using.tableSchema")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,19 +204,31 @@ class DeleteMetricsSuite extends QueryTest
import testImplicits._

Seq(true, false).foreach { cdfEnabled =>
withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> cdfEnabled.toString) {
withTable("t1") {
spark.range(100).withColumn("part", 'id % 10).toDF().write
.partitionBy("part").format("delta").saveAsTable("t1")
val result = spark.sql("DELETE FROM t1 WHERE part=1").take(1).head(0).toString.toLong
val opMetrics = DeltaMetricsUtils.getLastOperationMetrics("t1")

// This is a metadata operation. We expect the result (i.e. numAffectedRows) to be -1 and
// the operation metric for `numDeletedRows` not to exist. This metric is filtered out
// explicitly inside of [[DeltaOperations.Delete.transformMetrics]].
assert(opMetrics("numRemovedFiles") > 0)
assert(!opMetrics.contains("numDeletedRows"))
assert(result == -1)
Seq(true, false).foreach { deltaCollectStatsEnabled =>
Seq(true, false).foreach { deltaDmlMetricsFromMetadataEnabled =>
withSQLConf(
DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> cdfEnabled.toString,
DeltaSQLConf.DELTA_COLLECT_STATS.key -> deltaCollectStatsEnabled.toString,
DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA.key
-> deltaDmlMetricsFromMetadataEnabled.toString
) {
withTable("t1") {
spark.range(100).withColumn("part", 'id % 10).toDF().write
.partitionBy("part").format("delta").saveAsTable("t1")
val result = spark.sql("DELETE FROM t1 WHERE part=1")
.take(1).head(0).toString.toLong
val opMetrics = DeltaMetricsUtils.getLastOperationMetrics("t1")

assert(opMetrics("numRemovedFiles") > 0)
if (deltaCollectStatsEnabled && deltaDmlMetricsFromMetadataEnabled) {
assert(opMetrics("numDeletedRows") == 10)
assert(result == 10)
} else {
assert(!opMetrics.contains("numDeletedRows"))
assert(result == -1)
}
}
}
}
}
}
Expand Down Expand Up @@ -261,10 +273,10 @@ class DeleteMetricsSuite extends QueryTest
runDeleteAndCheckMetrics(
table = spark.range(start = 0, end = 100, step = 1, numPartitions = 5),
where = whereClause,
expectedNumAffectedRows = -1L,
expectedNumAffectedRows = 100,
expectedOperationMetrics = Map(
"numCopiedRows" -> -1,
"numDeletedRows" -> -1,
"numDeletedRows" -> 100,
"numOutputRows" -> -1,
"numFiles" -> -1,
"numAddedFiles" -> -1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1157,8 +1157,10 @@ trait DescribeDeltaHistorySuiteBase
"numFiles" -> "2",
"numOutputRows" -> "20",
"numAddedChangeFiles" -> "0",
"numRemovedFiles" -> "1"
),
"numRemovedFiles" -> "1",
"numCopiedRows" -> "0",
"numDeletedRows" -> "10"
),
getOperationMetrics(deltaTable.history(1)),
DeltaOperationMetrics.WRITE_REPLACE_WHERE_PARTITIONS
)
Expand Down

0 comments on commit 2118e64

Please # to comment.