Skip to content

Commit

Permalink
[Spark] Optimize batching / incremental progress (#3089)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Resolves #3081

Adds support for splitting an optimize run into batches with a new
config `spark.databricks.delta.optimize.batchSize`. Batches will be
created by grouping existing bins into groups until `batchSize` is
reached. The default behavior remains the same, and batching is only
enabled if the `batchSize` is configured.

This will apply to all optimization paths. I don't see any reason it
shouldn't apply to to compaction, z-ordering, clustering,
auto-compaction, or reorg/DV rewriting if a user configures it.

The way transactions are handled within the optimize executor had to be
updated. Instead of creating a transaction upfront, we list all the
files in the most recent snapshot, and then create transactions for each
batch.

This is very important to add for clustering, as there is no way to
manually do a partial set of the table using partition filtering. This
could cause a lot of execution time and storage space to be wasted if
something fails before optimizing the entire table finishes.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
A simple new UT is added. I can add others as well, just looking for
some feedback on the approach and suggestions of what other tests to
add.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
Yes, adds new capability to optimization that is disabled by default.
  • Loading branch information
Kimahriman authored Jun 15, 2024
1 parent ee350db commit d23324d
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.commands.optimize._
import org.apache.spark.sql.delta.files.SQLMetricsReporting
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.BinPackingUtils

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext.SPARK_JOB_GROUP_ID
Expand All @@ -42,6 +43,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.types._
import org.apache.spark.util.{SystemClock, ThreadUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTable

/** Base class defining abstract optimize command */
abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaCommand {
Expand All @@ -56,22 +58,22 @@ abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaComman
* - validates that we already collect stats for all the columns used in `unresolvedZOrderByCols`
*
* @param spark [[SparkSession]] to use
* @param txn the [[OptimisticTransaction]] being used to optimize
* @param snapshot the [[Snapshot]] being used to optimize from
* @param unresolvedZOrderByCols Seq of [[UnresolvedAttribute]] corresponding to zOrderBy columns
*/
def validateZorderByColumns(
spark: SparkSession,
txn: OptimisticTransaction,
snapshot: Snapshot,
unresolvedZOrderByCols: Seq[UnresolvedAttribute]): Unit = {
if (unresolvedZOrderByCols.isEmpty) return
val metadata = txn.snapshot.metadata
val metadata = snapshot.metadata
val partitionColumns = metadata.partitionColumns.toSet
val dataSchema =
StructType(metadata.schema.filterNot(c => partitionColumns.contains(c.name)))
val df = spark.createDataFrame(new java.util.ArrayList[Row](), dataSchema)
val checkColStat = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_ZORDER_COL_STAT_CHECK)
val statCollectionSchema = txn.snapshot.statCollectionLogicalSchema
val statCollectionSchema = snapshot.statCollectionLogicalSchema
val colsWithoutStats = ArrayBuffer[String]()

unresolvedZOrderByCols.foreach { colAttribute =>
Expand Down Expand Up @@ -142,12 +144,12 @@ case class OptimizeTableCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val table = getDeltaTable(child, "OPTIMIZE")
val txn = table.startTransaction()
if (txn.readVersion == -1) {
val snapshot = table.deltaLog.update()
if (snapshot.version == -1) {
throw DeltaErrors.notADeltaTableException(table.deltaLog.dataPath.toString)
}

if (ClusteredTableUtils.isSupported(txn.protocol)) {
if (ClusteredTableUtils.isSupported(snapshot.protocol)) {
if (userPartitionPredicates.nonEmpty) {
throw DeltaErrors.clusteringWithPartitionPredicatesException(userPartitionPredicates)
}
Expand All @@ -156,7 +158,7 @@ case class OptimizeTableCommand(
}
}

val partitionColumns = txn.snapshot.metadata.partitionColumns
val partitionColumns = snapshot.metadata.partitionColumns
// Parse the predicate expression into Catalyst expression and verify only simple filters
// on partition columns are present

Expand All @@ -169,12 +171,13 @@ case class OptimizeTableCommand(
predicates
}

validateZorderByColumns(sparkSession, txn, zOrderBy)
validateZorderByColumns(sparkSession, snapshot, zOrderBy)
val zOrderByColumns = zOrderBy.map(_.name).toSeq

new OptimizeExecutor(
sparkSession,
txn,
snapshot,
table.catalogTable,
partitionPredicates,
zOrderByColumns,
isAutoCompact = false,
Expand Down Expand Up @@ -207,17 +210,35 @@ case class DeltaOptimizeContext(
}
}

/**
* A bin represents a single set of files that are being re-written in a single Spark job.
* For compaction, this represents a single file being written. For clustering, this is
* an entire partition for Z-ordering, or an entire ZCube for liquid clustering.
*
* @param partitionValues The partition this set of files is in
* @param files The list of files being re-written
*/
case class Bin(partitionValues: Map[String, String], files: Seq[AddFile])

/**
* A batch represents all the bins that will be processed and commited in a single transaction.
*
* @param bins The set of bins to process in this transaction
*/
case class Batch(bins: Seq[Bin])

/**
* Optimize job which compacts small files into larger files to reduce
* the number of files and potentially allow more efficient reads.
*
* @param sparkSession Spark environment reference.
* @param txn The transaction used to optimize this table
* @param snapshot The snapshot of the table to optimize
* @param partitionPredicate List of partition predicates to select subset of files to optimize.
*/
class OptimizeExecutor(
sparkSession: SparkSession,
txn: OptimisticTransaction,
snapshot: Snapshot,
catalogTable: Option[CatalogTable],
partitionPredicate: Seq[Expression],
zOrderByColumns: Seq[String],
isAutoCompact: Boolean,
Expand All @@ -231,12 +252,12 @@ class OptimizeExecutor(
* 3. Clustering
*/
private val optimizeStrategy =
OptimizeTableStrategy(sparkSession, txn.snapshot, optimizeContext, zOrderByColumns)
OptimizeTableStrategy(sparkSession, snapshot, optimizeContext, zOrderByColumns)

/** Timestamp to use in [[FileAction]] */
private val operationTimestamp = new SystemClock().getTimeMillis()

private val isClusteredTable = ClusteredTableUtils.isSupported(txn.snapshot.protocol)
private val isClusteredTable = ClusteredTableUtils.isSupported(snapshot.protocol)

private val isMultiDimClustering =
optimizeStrategy.isInstanceOf[ClusteringStrategy] ||
Expand All @@ -246,76 +267,67 @@ class OptimizeExecutor(
if (zOrderByColumns.nonEmpty) {
zOrderByColumns
} else if (isClusteredTable) {
ClusteringColumnInfo.extractLogicalNames(txn.snapshot)
ClusteringColumnInfo.extractLogicalNames(snapshot)
} else {
Nil
}
}

private val partitionSchema = snapshot.metadata.partitionSchema

def optimize(): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
recordDeltaOperation(snapshot.deltaLog, "delta.optimize") {
val minFileSize = optimizeContext.minFileSize.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE))
val maxFileSize = optimizeContext.maxFileSize.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE))
val maxDeletedRowsRatio = optimizeContext.maxDeletedRowsRatio.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO))
val batchSize = sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_BATCH_SIZE)

val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true)
val partitionSchema = txn.metadata.partitionSchema
// Get all the files from the snapshot, we will register them with the individual
// transactions later
val candidateFiles = snapshot.filesForScan(partitionPredicate, keepNumRecords = true).files

val filesToProcess = optimizeContext.reorg match {
case Some(reorgOperation) =>
reorgOperation.filterFilesToReorg(sparkSession, txn.snapshot, candidateFiles)
reorgOperation.filterFilesToReorg(sparkSession, snapshot, candidateFiles)
case None =>
filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
}
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

val jobs = groupFilesIntoBins(partitionsToCompact)

val maxThreads =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS)
val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup =>
runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize)
}.flatten

val addedFiles = updates.collect { case a: AddFile => a }
val removedFiles = updates.collect { case r: RemoveFile => r }
val removedDVs = filesToProcess.filter(_.deletionVector != null).map(_.deletionVector).toSeq
if (addedFiles.size > 0) {
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs)
commitAndRetry(txn, getOperation(), updates, metrics) { newTxn =>
val newPartitionSchema = newTxn.metadata.partitionSchema
val candidateSetOld = candidateFiles.map(_.path).toSet
val candidateSetNew = newTxn.filterFiles(partitionPredicate).map(_.path).toSet

// As long as all of the files that we compacted are still part of the table,
// and the partitioning has not changed it is valid to continue to try
// and commit this checkpoint.
if (candidateSetOld.subsetOf(candidateSetNew) && partitionSchema == newPartitionSchema) {
true
} else {
val deleted = candidateSetOld -- candidateSetNew
logWarning(s"The following compacted files were deleted " +
s"during checkpoint ${deleted.mkString(",")}. Aborting the compaction.")
false
}
}
val batchResults = batchSize match {
case Some(size) =>
val batches = BinPackingUtils.binPackBySize[Bin, Bin](
jobs,
bin => bin.files.map(_.size).sum,
bin => bin,
size)
batches.map(batch => runOptimizeBatch(Batch(batch), maxFileSize))
case None =>
Seq(runOptimizeBatch(Batch(jobs), maxFileSize))
}

val addedFiles = batchResults.map(_._1).flatten
val removedFiles = batchResults.map(_._2).flatten
val removedDVs = batchResults.map(_._3).flatten

val optimizeStats = OptimizeStats()
optimizeStats.addedFilesSizeStats.merge(addedFiles)
optimizeStats.removedFilesSizeStats.merge(removedFiles)
optimizeStats.numPartitionsOptimized = jobs.map(j => j._1).distinct.size
optimizeStats.numBatches = jobs.size
optimizeStats.numPartitionsOptimized = jobs.map(j => j.partitionValues).distinct.size
optimizeStats.numBins = jobs.size
optimizeStats.numBatches = batchResults.size
optimizeStats.totalConsideredFiles = candidateFiles.size
optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size
optimizeStats.totalClusterParallelism = sparkSession.sparkContext.defaultParallelism
val numTableColumns = txn.snapshot.metadata.schema.size
val numTableColumns = snapshot.metadata.schema.size
optimizeStats.numTableColumns = numTableColumns
optimizeStats.numTableColumnsWithStats =
DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(txn.snapshot.metadata)
DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(snapshot.metadata)
.min(numTableColumns)
if (removedDVs.size > 0) {
optimizeStats.deletionVectorStats = Some(DeletionVectorStats(
Expand All @@ -325,7 +337,7 @@ class OptimizeExecutor(

optimizeStrategy.updateOptimizeStats(optimizeStats, removedFiles, jobs)

return Seq(Row(txn.deltaLog.dataPath.toString, optimizeStats.toOptimizeMetrics))
return Seq(Row(snapshot.deltaLog.dataPath.toString, optimizeStats.toOptimizeMetrics))
}
}

Expand Down Expand Up @@ -362,7 +374,7 @@ class OptimizeExecutor(
*/
private def groupFilesIntoBins(
partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])])
: Seq[(Map[String, String], Seq[AddFile])] = {
: Seq[Bin] = {
val maxBinSize = optimizeStrategy.maxBinSize
partitionsToCompact.flatMap {
case (partition, files) =>
Expand Down Expand Up @@ -400,8 +412,59 @@ class OptimizeExecutor(
bin.size > 1 || // bin has more than one file or
bin.size == 1 && optimizeContext.reorg.nonEmpty || // always rewrite files during reorg
isMultiDimClustering // multi-clustering
}.map(b => (partition, b))
}.map(b => Bin(partition, b))
}
}

private def runOptimizeBatch(
batch: Batch,
maxFileSize: Long
): (Seq[AddFile], Seq[RemoveFile], Seq[DeletionVectorDescriptor]) = {
val txn = snapshot.deltaLog.startTransaction(catalogTable, Some(snapshot))

val filesToProcess = batch.bins.flatMap(_.files)

txn.trackFilesRead(filesToProcess)
txn.trackReadPredicates(partitionPredicate)

val maxThreads =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS)
val updates = ThreadUtils.parmap(batch.bins, "OptimizeJob", maxThreads) { partitionBinGroup =>
runOptimizeBinJob(txn, partitionBinGroup.partitionValues, partitionBinGroup.files,
maxFileSize)
}.flatten

val addedFiles = updates.collect { case a: AddFile => a }
val removedFiles = updates.collect { case r: RemoveFile => r }
val removedDVs = filesToProcess.filter(_.deletionVector != null).map(_.deletionVector).toSeq
if (addedFiles.size > 0) {
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs)
commitAndRetry(txn, getOperation(), updates, metrics) { newTxn =>
val newPartitionSchema = newTxn.metadata.partitionSchema
val candidateSetOld = filesToProcess.map(_.path).toSet
// We specifically don't list the files through the transaction since we are potentially
// only processing a subset of them below. If the transaction is still valid, we will
// register the files and predicate below
val candidateSetNew =
newTxn.snapshot.filesForScan(partitionPredicate).files.map(_.path).toSet

// As long as all of the files that we compacted are still part of the table,
// and the partitioning has not changed it is valid to continue to try
// and commit this checkpoint.
if (candidateSetOld.subsetOf(candidateSetNew) && partitionSchema == newPartitionSchema) {
// Make sure the files we are processing are registered with the transaction
newTxn.trackFilesRead(filesToProcess)
newTxn.trackReadPredicates(partitionPredicate)
true
} else {
val deleted = candidateSetOld -- candidateSetNew
logWarning(s"The following compacted files were deleted " +
s"during checkpoint ${deleted.mkString(",")}. Aborting the compaction.")
false
}
}
}
(addedFiles, removedFiles, removedDVs)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ trait OptimizeTableStrategy {
def updateOptimizeStats(
optimizeStats: OptimizeStats,
removedFiles: Seq[RemoveFile],
bins: Seq[(Map[String, String], Seq[AddFile])]): Unit
bins: Seq[Bin]): Unit
}

object OptimizeTableStrategy {
Expand Down Expand Up @@ -151,7 +151,7 @@ case class CompactionStrategy(
override def updateOptimizeStats(
optimizeStats: OptimizeStats,
removedFiles: Seq[RemoveFile],
bins: Seq[(Map[String, String], Seq[AddFile])]): Unit = {}
bins: Seq[Bin]): Unit = {}
}

/** Implements ZOrder strategy */
Expand All @@ -171,7 +171,7 @@ case class ZOrderStrategy(
override def updateOptimizeStats(
optimizeStats: OptimizeStats,
removedFiles: Seq[RemoveFile],
bins: Seq[(Map[String, String], Seq[AddFile])]): Unit = {
bins: Seq[Bin]): Unit = {
val inputFileStats =
ZOrderFileStats(removedFiles.size, removedFiles.map(_.size.getOrElse(0L)).sum)
optimizeStats.zOrderStats = Some(ZOrderStats(
Expand Down Expand Up @@ -227,7 +227,7 @@ case class ClusteringStrategy(
override def updateOptimizeStats(
optimizeStats: OptimizeStats,
removedFiles: Seq[RemoveFile],
bins: Seq[(Map[String, String], Seq[AddFile])]): Unit = {
bins: Seq[Bin]): Unit = {
clusteringStatsCollector.numOutputZCubes = bins.size
optimizeStats.clusteringStats = Option(clusteringStatsCollector.getClusteringStats)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ case class OptimizeStats(
var numPartitionsOptimized: Long = 0,
var zOrderStats: Option[ZOrderStats] = None,
var clusteringStats: Option[ClusteringStats] = None,
var numBins: Long = 0,
var numBatches: Long = 0,
var totalConsideredFiles: Long = 0,
var totalFilesSkipped: Long = 0,
Expand All @@ -54,6 +55,7 @@ case class OptimizeStats(
partitionsOptimized = numPartitionsOptimized,
zOrderStats = zOrderStats,
clusteringStats = clusteringStats,
numBins = numBins,
numBatches = numBatches,
totalConsideredFiles = totalConsideredFiles,
totalFilesSkipped = totalFilesSkipped,
Expand Down Expand Up @@ -200,6 +202,7 @@ object FileSizeStatsWithHistogram {
* @param partitionsOptimized Number of partitions optimized
* @param zOrderStats Z-Order stats
* @param clusteringStats Clustering stats
* @param numBins Number of bins
* @param numBatches Number of batches
* @param totalConsideredFiles Number of files considered for the Optimize operation.
* @param totalFilesSkipped Number of files that are skipped from being Optimized.
Expand Down Expand Up @@ -227,6 +230,7 @@ case class OptimizeMetrics(
partitionsOptimized: Long = 0,
zOrderStats: Option[ZOrderStats] = None,
clusteringStats: Option[ClusteringStats] = None,
numBins: Long,
numBatches: Long,
totalConsideredFiles: Long,
totalFilesSkipped: Long = 0,
Expand Down
Loading

0 comments on commit d23324d

Please # to comment.