diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 7252ce663a6..7f241d1e242 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -489,19 +489,27 @@ private[delta] object DeltaOperationMetrics { "numOutputBytes", // size in bytes of the written contents "numOutputRows", // number of rows written "numAddedChangeFiles", // number of CDC files - "numRemovedFiles" // number of files removed + "numRemovedFiles", // number of files removed + // Records below only exist when DELTA_DML_METRICS_FROM_METADATA is enabled + "numCopiedRows", // number of rows copied + "numDeletedRows" // number of rows deleted ) /** - * Deleting the entire table or partition would prevent row level metrics from being recorded. - * This is used only in test to verify specific delete cases. + * Deleting the entire table or partition will record row level metrics when + * DELTA_DML_METRICS_FROM_METADATA is enabled + * * DELETE_PARTITIONS is used only in test to verify specific delete cases. */ val DELETE_PARTITIONS = Set( "numRemovedFiles", // number of files removed "numAddedChangeFiles", // number of CDC files generated - generally 0 in this case "executionTimeMs", // time taken to execute the entire operation "scanTimeMs", // time taken to scan the files for matches - "rewriteTimeMs" // time taken to rewrite the matched files + "rewriteTimeMs", // time taken to rewrite the matched files + // Records below only exist when DELTA_DML_METRICS_FROM_METADATA is enabled + "numCopiedRows", // number of rows copied + "numDeletedRows", // number of rows deleted + "numAddedFiles" // number of files added ) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 6fda24778f2..1c9657ba9a1 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -538,6 +538,19 @@ trait OptimisticTransactionImpl extends TransactionalWrite scan.files } + /** Same as filterFiles but makes sure that the stats contain at least the numRecords field. */ + def filterFilesWithNumRecords(filters: Seq[Expression]): Seq[AddFile] = { + val scan = snapshot.filesForScan( + filters = filters, + keepNumRecords = true) + val partitionFilters = filters.filter { f => + DeltaTableUtils.isPredicatePartitionColumnsOnly(f, metadata.partitionColumns, spark) + } + readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true)) + readFiles ++= scan.files + scan.files + } + /** Returns files within the given partitions. */ def filterFiles(partitions: Set[Map[String, String]]): Seq[AddFile] = { import org.apache.spark.sql.functions.{array, col} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala index badfbfe9664..f398216a2b8 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.delta.commands import org.apache.spark.sql.delta._ -import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, FileAction} +import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, FileAction} import org.apache.spark.sql.delta.commands.DeleteCommand.{rewritingFilesMsg, FINDING_TOUCHED_FILES_MSG} import org.apache.spark.sql.delta.commands.MergeIntoCommand.totalBytesAndDistinctPartitionValues import org.apache.spark.sql.delta.files.TahoeBatchFileIndex +import org.apache.spark.sql.delta.sources.DeltaSQLConf import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.SparkContext @@ -63,6 +64,26 @@ trait DeleteCommandMetrics { self: LeafRunnableCommand => "changeFileBytes" -> createMetric(sc, "total size of change data capture files generated"), "numTouchedRows" -> createMetric(sc, "number of rows touched") ) + + def getDeletedRowsFromAddFilesAndUpdateMetrics(files: Seq[AddFile]) : Option[Long] = { + if (!conf.getConf(DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA)) { + return None; + } + // No file to get metadata, return none to be consistent with metadata stats disabled + if (files.isEmpty) { + return None + } + // Return None if any file does not contain numLogicalRecords status + var count: Long = 0 + for (file <- files) { + if (file.numLogicalRecords.isEmpty) { + return None + } + count += file.numLogicalRecords.get + } + metrics("numDeletedRows").set(count) + return Some(count) + } } /** @@ -142,7 +163,12 @@ case class DeleteCommand( val deleteActions: Seq[Action] = condition match { case None => // Case 1: Delete the whole table if the condition is true - val allFiles = txn.filterFiles(Nil) + val allFiles = + if (conf.getConf(DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA)) { + txn.filterFilesWithNumRecords(Nil) + } else { + txn.filterFiles(Nil) + } numRemovedFiles = allFiles.size scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 @@ -152,6 +178,8 @@ case class DeleteCommand( numBytesBeforeSkipping = numBytes numFilesAfterSkipping = numRemovedFiles numBytesAfterSkipping = numBytes + numDeletedRows = getDeletedRowsFromAddFilesAndUpdateMetrics(allFiles) + if (txn.metadata.partitionColumns.nonEmpty) { numPartitionsAfterSkipping = Some(numPartitions) numPartitionsRemovedFrom = Some(numPartitions) @@ -171,7 +199,12 @@ case class DeleteCommand( // Case 2: The condition can be evaluated using metadata only. // Delete a set of files without the need of scanning any data files. val operationTimestamp = System.currentTimeMillis() - val candidateFiles = txn.filterFiles(metadataPredicates) + val candidateFiles = + if (conf.getConf(DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA)) { + txn.filterFilesWithNumRecords(metadataPredicates) + } else { + txn.filterFiles(metadataPredicates) + } scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 numRemovedFiles = candidateFiles.size @@ -180,6 +213,8 @@ case class DeleteCommand( val (numCandidateBytes, numCandidatePartitions) = totalBytesAndDistinctPartitionValues(candidateFiles) numBytesAfterSkipping = numCandidateBytes + numDeletedRows = getDeletedRowsFromAddFilesAndUpdateMetrics(candidateFiles) + if (txn.metadata.partitionColumns.nonEmpty) { numPartitionsAfterSkipping = Some(numCandidatePartitions) numPartitionsRemovedFrom = Some(numCandidatePartitions) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 32d7a2f43f1..6c77404d601 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -58,6 +58,16 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_DML_METRICS_FROM_METADATA = + buildConf("dmlMetricsFromMetadata.enabled") + .internal() + .doc( + """ When enabled, metadata only Delete, ReplaceWhere and Truncate operations will report row + | level operation metrics by reading the file statistics for number of rows. + | """.stripMargin) + .booleanConf + .createWithDefault(true) + val DELTA_COLLECT_STATS_USING_TABLE_SCHEMA = buildConf("stats.collect.using.tableSchema") .internal() diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeleteMetricsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeleteMetricsSuite.scala index 6c5b1259f87..afccf02d3fa 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeleteMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeleteMetricsSuite.scala @@ -204,19 +204,31 @@ class DeleteMetricsSuite extends QueryTest import testImplicits._ Seq(true, false).foreach { cdfEnabled => - withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> cdfEnabled.toString) { - withTable("t1") { - spark.range(100).withColumn("part", 'id % 10).toDF().write - .partitionBy("part").format("delta").saveAsTable("t1") - val result = spark.sql("DELETE FROM t1 WHERE part=1").take(1).head(0).toString.toLong - val opMetrics = DeltaMetricsUtils.getLastOperationMetrics("t1") - - // This is a metadata operation. We expect the result (i.e. numAffectedRows) to be -1 and - // the operation metric for `numDeletedRows` not to exist. This metric is filtered out - // explicitly inside of [[DeltaOperations.Delete.transformMetrics]]. - assert(opMetrics("numRemovedFiles") > 0) - assert(!opMetrics.contains("numDeletedRows")) - assert(result == -1) + Seq(true, false).foreach { deltaCollectStatsEnabled => + Seq(true, false).foreach { deltaDmlMetricsFromMetadataEnabled => + withSQLConf( + DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> cdfEnabled.toString, + DeltaSQLConf.DELTA_COLLECT_STATS.key -> deltaCollectStatsEnabled.toString, + DeltaSQLConf.DELTA_DML_METRICS_FROM_METADATA.key + -> deltaDmlMetricsFromMetadataEnabled.toString + ) { + withTable("t1") { + spark.range(100).withColumn("part", 'id % 10).toDF().write + .partitionBy("part").format("delta").saveAsTable("t1") + val result = spark.sql("DELETE FROM t1 WHERE part=1") + .take(1).head(0).toString.toLong + val opMetrics = DeltaMetricsUtils.getLastOperationMetrics("t1") + + assert(opMetrics("numRemovedFiles") > 0) + if (deltaCollectStatsEnabled && deltaDmlMetricsFromMetadataEnabled) { + assert(opMetrics("numDeletedRows") == 10) + assert(result == 10) + } else { + assert(!opMetrics.contains("numDeletedRows")) + assert(result == -1) + } + } + } } } } @@ -261,10 +273,10 @@ class DeleteMetricsSuite extends QueryTest runDeleteAndCheckMetrics( table = spark.range(start = 0, end = 100, step = 1, numPartitions = 5), where = whereClause, - expectedNumAffectedRows = -1L, + expectedNumAffectedRows = 100, expectedOperationMetrics = Map( "numCopiedRows" -> -1, - "numDeletedRows" -> -1, + "numDeletedRows" -> 100, "numOutputRows" -> -1, "numFiles" -> -1, "numAddedFiles" -> -1, diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala index 40c93a29264..2b470dc6ddc 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala @@ -1157,8 +1157,10 @@ trait DescribeDeltaHistorySuiteBase "numFiles" -> "2", "numOutputRows" -> "20", "numAddedChangeFiles" -> "0", - "numRemovedFiles" -> "1" - ), + "numRemovedFiles" -> "1", + "numCopiedRows" -> "0", + "numDeletedRows" -> "10" + ), getOperationMetrics(deltaTable.history(1)), DeltaOperationMetrics.WRITE_REPLACE_WHERE_PARTITIONS )