Skip to content

Commit

Permalink
Add additional metrics to vacuum stats.
Browse files Browse the repository at this point in the history
This change adds these additional three metrics to the vacuum stats:
1. sizeOfDataToDelete
2. timeTakenToIdentifyEligibleFiles
3. timeTakenForDelete

GitOrigin-RevId: 34a0e40cbe1049a219d8dfcd576631f7b6ed4aa2
  • Loading branch information
rajeshparangi authored and scottsand-db committed Nov 9, 2022
1 parent 529313d commit fd503d8
Showing 1 changed file with 53 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions.{col, count, sum}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}

/**
Expand All @@ -47,6 +47,7 @@ import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}
*/
object VacuumCommand extends VacuumCommandImpl with Serializable {

case class FileNameAndSize(path: String, length: Long)
/**
* Additional check on retention duration to prevent people from shooting themselves in the foot.
*/
Expand Down Expand Up @@ -130,6 +131,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val relativizeIgnoreError =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)

val startTimeToIdentifyEligibleFiles = System.currentTimeMillis()
val validFiles = snapshot.stateDS
.mapPartitions { actions =>
val reservoirBase = new Path(basePath)
Expand Down Expand Up @@ -171,6 +173,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
try {
allFilesAndDirs.cache()

implicit val fileNameAndSizeEncoder = org.apache.spark.sql.Encoders.product[FileNameAndSize]

val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path

// The logic below is as follows:
Expand All @@ -188,67 +192,90 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
fileStatusIterator.flatMap { fileStatus =>
if (fileStatus.isDir) {
Iterator.single(
relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true))
Iterator.single(FileNameAndSize(
relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L))
} else {
val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
val dirsWithSlash = dirs.map { p =>
relativize(new Path(p), fs, reservoirBase, isDir = true)
val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
FileNameAndSize(relativizedPath, 0L)
}
dirsWithSlash ++ Iterator(
relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = false))
FileNameAndSize(relativize(
fileStatus.getHadoopPath, fs, reservoirBase, isDir = false), fileStatus.length))
}
}
}.groupBy(col("value").as("path"))
.count()
}.groupBy(col("path")).agg(count(new Column("*")).as("count"), sum("length").as("length"))
.join(validFiles, Seq("path"), "leftanti")
.where(col("count") === 1)


val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first
val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) {
0L
} else {
sizeOfDataToDeleteRow.getLong(0)
}

val diffFiles = diff
.select(col("path"))
.as[String]
.map { relativePath =>
assert(!stringToPath(relativePath).isAbsolute,
"Shouldn't have any absolute paths for deletion here.")
pathToString(DeltaFileOperations.absolutePath(basePath, relativePath))
}
val timeTakenToIdentifyEligibleFiles =
System.currentTimeMillis() - startTimeToIdentifyEligibleFiles

if (dryRun) {
val numFiles = diff.count()
val numFiles = diffFiles.count()
val stats = DeltaVacuumStats(
isDryRun = true,
specifiedRetentionMillis = retentionMillis,
defaultRetentionMillis = deltaLog.tombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
objectsDeleted = numFiles)
objectsDeleted = numFiles,
sizeOfDataToDelete = sizeOfDataToDelete,
timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
timeTakenForDelete = 0L)

recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logConsole(s"Found $numFiles files and directories in a total of " +
s"$dirCounts directories that are safe to delete.")
s"$dirCounts directories that are safe to delete.$stats")

return diff.map(f => stringToPath(f).toString).toDF("path")
return diffFiles.map(f => stringToPath(f).toString).toDF("path")
}
logVacuumStart(
spark,
deltaLog,
path,
diff,
diffFiles,
sizeOfDataToDelete,
retentionMillis,
deltaLog.tombstoneRetentionMillis)

val deleteStartTime = System.currentTimeMillis()
val filesDeleted = try {
delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled,
parallelDeletePartitions)
} catch { case t: Throwable =>
logVacuumEnd(deltaLog, spark, path)
throw t
delete(diffFiles, spark, basePath,
hadoopConf, parallelDeleteEnabled, parallelDeletePartitions)
} catch {
case t: Throwable =>
logVacuumEnd(deltaLog, spark, path)
throw t
}
val timeTakenForDelete = System.currentTimeMillis() - deleteStartTime
val stats = DeltaVacuumStats(
isDryRun = false,
specifiedRetentionMillis = retentionMillis,
defaultRetentionMillis = deltaLog.tombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
objectsDeleted = filesDeleted)
objectsDeleted = filesDeleted,
sizeOfDataToDelete = sizeOfDataToDelete,
timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
timeTakenForDelete = timeTakenForDelete)
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts))

Expand All @@ -267,9 +294,11 @@ trait VacuumCommandImpl extends DeltaCommand {
deltaLog: DeltaLog,
path: Path,
diff: Dataset[String],
sizeOfDataToDelete: Long,
specifiedRetentionMillis: Option[Long],
defaultRetentionMillis: Long): Unit = {
logInfo(s"Deleting untracked files and empty directories in $path")
logInfo(s"Deleting untracked files and empty directories in $path. The amount of data to be " +
s"deleted is $sizeOfDataToDelete (in bytes)")
}

protected def logVacuumEnd(
Expand Down Expand Up @@ -379,4 +408,7 @@ case class DeltaVacuumStats(
defaultRetentionMillis: Long,
minRetainedTimestamp: Long,
dirsPresentBeforeDelete: Long,
objectsDeleted: Long)
objectsDeleted: Long,
sizeOfDataToDelete: Long,
timeTakenToIdentifyEligibleFiles: Long,
timeTakenForDelete: Long)

0 comments on commit fd503d8

Please # to comment.