From fd503d80328bec38a274ea36f99c2ba68e64f8ad Mon Sep 17 00:00:00 2001 From: Rajesh Parangi Date: Thu, 27 Oct 2022 12:24:38 -0700 Subject: [PATCH] Add additional metrics to vacuum stats. This change adds these additional three metrics to the vacuum stats: 1. sizeOfDataToDelete 2. timeTakenToIdentifyEligibleFiles 3. timeTakenForDelete GitOrigin-RevId: 34a0e40cbe1049a219d8dfcd576631f7b6ed4aa2 --- .../sql/delta/commands/VacuumCommand.scala | 74 +++++++++++++------ 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 624350ad381..fa5b5ec554c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -32,8 +32,8 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.functions.col +import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.functions.{col, count, sum} import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock} /** @@ -47,6 +47,7 @@ import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock} */ object VacuumCommand extends VacuumCommandImpl with Serializable { + case class FileNameAndSize(path: String, length: Long) /** * Additional check on retention duration to prevent people from shooting themselves in the foot. */ @@ -130,6 +131,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val relativizeIgnoreError = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR) + val startTimeToIdentifyEligibleFiles = System.currentTimeMillis() val validFiles = snapshot.stateDS .mapPartitions { actions => val reservoirBase = new Path(basePath) @@ -171,6 +173,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { try { allFilesAndDirs.cache() + implicit val fileNameAndSizeEncoder = org.apache.spark.sql.Encoders.product[FileNameAndSize] + val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path // The logic below is as follows: @@ -188,21 +192,32 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val fs = reservoirBase.getFileSystem(hadoopConf.value.value) fileStatusIterator.flatMap { fileStatus => if (fileStatus.isDir) { - Iterator.single( - relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true)) + Iterator.single(FileNameAndSize( + relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L)) } else { val dirs = getAllSubdirs(basePath, fileStatus.path, fs) val dirsWithSlash = dirs.map { p => - relativize(new Path(p), fs, reservoirBase, isDir = true) + val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true) + FileNameAndSize(relativizedPath, 0L) } dirsWithSlash ++ Iterator( - relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = false)) + FileNameAndSize(relativize( + fileStatus.getHadoopPath, fs, reservoirBase, isDir = false), fileStatus.length)) } } - }.groupBy(col("value").as("path")) - .count() + }.groupBy(col("path")).agg(count(new Column("*")).as("count"), sum("length").as("length")) .join(validFiles, Seq("path"), "leftanti") .where(col("count") === 1) + + + val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first + val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) { + 0L + } else { + sizeOfDataToDeleteRow.getLong(0) + } + + val diffFiles = diff .select(col("path")) .as[String] .map { relativePath => @@ -210,45 +225,57 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { "Shouldn't have any absolute paths for deletion here.") pathToString(DeltaFileOperations.absolutePath(basePath, relativePath)) } + val timeTakenToIdentifyEligibleFiles = + System.currentTimeMillis() - startTimeToIdentifyEligibleFiles if (dryRun) { - val numFiles = diff.count() + val numFiles = diffFiles.count() val stats = DeltaVacuumStats( isDryRun = true, specifiedRetentionMillis = retentionMillis, defaultRetentionMillis = deltaLog.tombstoneRetentionMillis, minRetainedTimestamp = deleteBeforeTimestamp, dirsPresentBeforeDelete = dirCounts, - objectsDeleted = numFiles) + objectsDeleted = numFiles, + sizeOfDataToDelete = sizeOfDataToDelete, + timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles, + timeTakenForDelete = 0L) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) logConsole(s"Found $numFiles files and directories in a total of " + - s"$dirCounts directories that are safe to delete.") + s"$dirCounts directories that are safe to delete.$stats") - return diff.map(f => stringToPath(f).toString).toDF("path") + return diffFiles.map(f => stringToPath(f).toString).toDF("path") } logVacuumStart( spark, deltaLog, path, - diff, + diffFiles, + sizeOfDataToDelete, retentionMillis, deltaLog.tombstoneRetentionMillis) + val deleteStartTime = System.currentTimeMillis() val filesDeleted = try { - delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled, - parallelDeletePartitions) - } catch { case t: Throwable => - logVacuumEnd(deltaLog, spark, path) - throw t + delete(diffFiles, spark, basePath, + hadoopConf, parallelDeleteEnabled, parallelDeletePartitions) + } catch { + case t: Throwable => + logVacuumEnd(deltaLog, spark, path) + throw t } + val timeTakenForDelete = System.currentTimeMillis() - deleteStartTime val stats = DeltaVacuumStats( isDryRun = false, specifiedRetentionMillis = retentionMillis, defaultRetentionMillis = deltaLog.tombstoneRetentionMillis, minRetainedTimestamp = deleteBeforeTimestamp, dirsPresentBeforeDelete = dirCounts, - objectsDeleted = filesDeleted) + objectsDeleted = filesDeleted, + sizeOfDataToDelete = sizeOfDataToDelete, + timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles, + timeTakenForDelete = timeTakenForDelete) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts)) @@ -267,9 +294,11 @@ trait VacuumCommandImpl extends DeltaCommand { deltaLog: DeltaLog, path: Path, diff: Dataset[String], + sizeOfDataToDelete: Long, specifiedRetentionMillis: Option[Long], defaultRetentionMillis: Long): Unit = { - logInfo(s"Deleting untracked files and empty directories in $path") + logInfo(s"Deleting untracked files and empty directories in $path. The amount of data to be " + + s"deleted is $sizeOfDataToDelete (in bytes)") } protected def logVacuumEnd( @@ -379,4 +408,7 @@ case class DeltaVacuumStats( defaultRetentionMillis: Long, minRetainedTimestamp: Long, dirsPresentBeforeDelete: Long, - objectsDeleted: Long) + objectsDeleted: Long, + sizeOfDataToDelete: Long, + timeTakenToIdentifyEligibleFiles: Long, + timeTakenForDelete: Long)