diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 711e05eb9c2..bcfad684056 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.commands.optimize._ import org.apache.spark.sql.delta.files.SQLMetricsReporting import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.util.BinPackingUtils import org.apache.spark.SparkContext import org.apache.spark.SparkContext.SPARK_JOB_GROUP_ID @@ -42,6 +43,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric import org.apache.spark.sql.types._ import org.apache.spark.util.{SystemClock, ThreadUtils} +import org.apache.spark.sql.catalyst.catalog.CatalogTable /** Base class defining abstract optimize command */ abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaCommand { @@ -56,22 +58,22 @@ abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaComman * - validates that we already collect stats for all the columns used in `unresolvedZOrderByCols` * * @param spark [[SparkSession]] to use - * @param txn the [[OptimisticTransaction]] being used to optimize + * @param snapshot the [[Snapshot]] being used to optimize from * @param unresolvedZOrderByCols Seq of [[UnresolvedAttribute]] corresponding to zOrderBy columns */ def validateZorderByColumns( spark: SparkSession, - txn: OptimisticTransaction, + snapshot: Snapshot, unresolvedZOrderByCols: Seq[UnresolvedAttribute]): Unit = { if (unresolvedZOrderByCols.isEmpty) return - val metadata = txn.snapshot.metadata + val metadata = snapshot.metadata val partitionColumns = metadata.partitionColumns.toSet val dataSchema = StructType(metadata.schema.filterNot(c => partitionColumns.contains(c.name))) val df = spark.createDataFrame(new java.util.ArrayList[Row](), dataSchema) val checkColStat = spark.sessionState.conf.getConf( DeltaSQLConf.DELTA_OPTIMIZE_ZORDER_COL_STAT_CHECK) - val statCollectionSchema = txn.snapshot.statCollectionLogicalSchema + val statCollectionSchema = snapshot.statCollectionLogicalSchema val colsWithoutStats = ArrayBuffer[String]() unresolvedZOrderByCols.foreach { colAttribute => @@ -142,12 +144,12 @@ case class OptimizeTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val table = getDeltaTable(child, "OPTIMIZE") - val txn = table.startTransaction() - if (txn.readVersion == -1) { + val snapshot = table.deltaLog.update() + if (snapshot.version == -1) { throw DeltaErrors.notADeltaTableException(table.deltaLog.dataPath.toString) } - if (ClusteredTableUtils.isSupported(txn.protocol)) { + if (ClusteredTableUtils.isSupported(snapshot.protocol)) { if (userPartitionPredicates.nonEmpty) { throw DeltaErrors.clusteringWithPartitionPredicatesException(userPartitionPredicates) } @@ -156,7 +158,7 @@ case class OptimizeTableCommand( } } - val partitionColumns = txn.snapshot.metadata.partitionColumns + val partitionColumns = snapshot.metadata.partitionColumns // Parse the predicate expression into Catalyst expression and verify only simple filters // on partition columns are present @@ -169,12 +171,13 @@ case class OptimizeTableCommand( predicates } - validateZorderByColumns(sparkSession, txn, zOrderBy) + validateZorderByColumns(sparkSession, snapshot, zOrderBy) val zOrderByColumns = zOrderBy.map(_.name).toSeq new OptimizeExecutor( sparkSession, - txn, + snapshot, + table.catalogTable, partitionPredicates, zOrderByColumns, isAutoCompact = false, @@ -207,17 +210,35 @@ case class DeltaOptimizeContext( } } +/** + * A bin represents a single set of files that are being re-written in a single Spark job. + * For compaction, this represents a single file being written. For clustering, this is + * an entire partition for Z-ordering, or an entire ZCube for liquid clustering. + * + * @param partitionValues The partition this set of files is in + * @param files The list of files being re-written + */ +case class Bin(partitionValues: Map[String, String], files: Seq[AddFile]) + +/** + * A batch represents all the bins that will be processed and commited in a single transaction. + * + * @param bins The set of bins to process in this transaction + */ +case class Batch(bins: Seq[Bin]) + /** * Optimize job which compacts small files into larger files to reduce * the number of files and potentially allow more efficient reads. * * @param sparkSession Spark environment reference. - * @param txn The transaction used to optimize this table + * @param snapshot The snapshot of the table to optimize * @param partitionPredicate List of partition predicates to select subset of files to optimize. */ class OptimizeExecutor( sparkSession: SparkSession, - txn: OptimisticTransaction, + snapshot: Snapshot, + catalogTable: Option[CatalogTable], partitionPredicate: Seq[Expression], zOrderByColumns: Seq[String], isAutoCompact: Boolean, @@ -231,12 +252,12 @@ class OptimizeExecutor( * 3. Clustering */ private val optimizeStrategy = - OptimizeTableStrategy(sparkSession, txn.snapshot, optimizeContext, zOrderByColumns) + OptimizeTableStrategy(sparkSession, snapshot, optimizeContext, zOrderByColumns) /** Timestamp to use in [[FileAction]] */ private val operationTimestamp = new SystemClock().getTimeMillis() - private val isClusteredTable = ClusteredTableUtils.isSupported(txn.snapshot.protocol) + private val isClusteredTable = ClusteredTableUtils.isSupported(snapshot.protocol) private val isMultiDimClustering = optimizeStrategy.isInstanceOf[ClusteringStrategy] || @@ -246,27 +267,31 @@ class OptimizeExecutor( if (zOrderByColumns.nonEmpty) { zOrderByColumns } else if (isClusteredTable) { - ClusteringColumnInfo.extractLogicalNames(txn.snapshot) + ClusteringColumnInfo.extractLogicalNames(snapshot) } else { Nil } } + private val partitionSchema = snapshot.metadata.partitionSchema + def optimize(): Seq[Row] = { - recordDeltaOperation(txn.deltaLog, "delta.optimize") { + recordDeltaOperation(snapshot.deltaLog, "delta.optimize") { val minFileSize = optimizeContext.minFileSize.getOrElse( sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)) val maxFileSize = optimizeContext.maxFileSize.getOrElse( sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)) val maxDeletedRowsRatio = optimizeContext.maxDeletedRowsRatio.getOrElse( sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO)) + val batchSize = sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_BATCH_SIZE) - val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true) - val partitionSchema = txn.metadata.partitionSchema + // Get all the files from the snapshot, we will register them with the individual + // transactions later + val candidateFiles = snapshot.filesForScan(partitionPredicate, keepNumRecords = true).files val filesToProcess = optimizeContext.reorg match { case Some(reorgOperation) => - reorgOperation.filterFilesToReorg(sparkSession, txn.snapshot, candidateFiles) + reorgOperation.filterFilesToReorg(sparkSession, snapshot, candidateFiles) case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) } @@ -274,48 +299,35 @@ class OptimizeExecutor( val jobs = groupFilesIntoBins(partitionsToCompact) - val maxThreads = - sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS) - val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup => - runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize) - }.flatten - - val addedFiles = updates.collect { case a: AddFile => a } - val removedFiles = updates.collect { case r: RemoveFile => r } - val removedDVs = filesToProcess.filter(_.deletionVector != null).map(_.deletionVector).toSeq - if (addedFiles.size > 0) { - val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs) - commitAndRetry(txn, getOperation(), updates, metrics) { newTxn => - val newPartitionSchema = newTxn.metadata.partitionSchema - val candidateSetOld = candidateFiles.map(_.path).toSet - val candidateSetNew = newTxn.filterFiles(partitionPredicate).map(_.path).toSet - - // As long as all of the files that we compacted are still part of the table, - // and the partitioning has not changed it is valid to continue to try - // and commit this checkpoint. - if (candidateSetOld.subsetOf(candidateSetNew) && partitionSchema == newPartitionSchema) { - true - } else { - val deleted = candidateSetOld -- candidateSetNew - logWarning(s"The following compacted files were deleted " + - s"during checkpoint ${deleted.mkString(",")}. Aborting the compaction.") - false - } - } + val batchResults = batchSize match { + case Some(size) => + val batches = BinPackingUtils.binPackBySize[Bin, Bin]( + jobs, + bin => bin.files.map(_.size).sum, + bin => bin, + size) + batches.map(batch => runOptimizeBatch(Batch(batch), maxFileSize)) + case None => + Seq(runOptimizeBatch(Batch(jobs), maxFileSize)) } + val addedFiles = batchResults.map(_._1).flatten + val removedFiles = batchResults.map(_._2).flatten + val removedDVs = batchResults.map(_._3).flatten + val optimizeStats = OptimizeStats() optimizeStats.addedFilesSizeStats.merge(addedFiles) optimizeStats.removedFilesSizeStats.merge(removedFiles) - optimizeStats.numPartitionsOptimized = jobs.map(j => j._1).distinct.size - optimizeStats.numBatches = jobs.size + optimizeStats.numPartitionsOptimized = jobs.map(j => j.partitionValues).distinct.size + optimizeStats.numBins = jobs.size + optimizeStats.numBatches = batchResults.size optimizeStats.totalConsideredFiles = candidateFiles.size optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size optimizeStats.totalClusterParallelism = sparkSession.sparkContext.defaultParallelism - val numTableColumns = txn.snapshot.metadata.schema.size + val numTableColumns = snapshot.metadata.schema.size optimizeStats.numTableColumns = numTableColumns optimizeStats.numTableColumnsWithStats = - DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(txn.snapshot.metadata) + DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(snapshot.metadata) .min(numTableColumns) if (removedDVs.size > 0) { optimizeStats.deletionVectorStats = Some(DeletionVectorStats( @@ -325,7 +337,7 @@ class OptimizeExecutor( optimizeStrategy.updateOptimizeStats(optimizeStats, removedFiles, jobs) - return Seq(Row(txn.deltaLog.dataPath.toString, optimizeStats.toOptimizeMetrics)) + return Seq(Row(snapshot.deltaLog.dataPath.toString, optimizeStats.toOptimizeMetrics)) } } @@ -362,7 +374,7 @@ class OptimizeExecutor( */ private def groupFilesIntoBins( partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])]) - : Seq[(Map[String, String], Seq[AddFile])] = { + : Seq[Bin] = { val maxBinSize = optimizeStrategy.maxBinSize partitionsToCompact.flatMap { case (partition, files) => @@ -400,8 +412,59 @@ class OptimizeExecutor( bin.size > 1 || // bin has more than one file or bin.size == 1 && optimizeContext.reorg.nonEmpty || // always rewrite files during reorg isMultiDimClustering // multi-clustering - }.map(b => (partition, b)) + }.map(b => Bin(partition, b)) + } + } + + private def runOptimizeBatch( + batch: Batch, + maxFileSize: Long + ): (Seq[AddFile], Seq[RemoveFile], Seq[DeletionVectorDescriptor]) = { + val txn = snapshot.deltaLog.startTransaction(catalogTable, Some(snapshot)) + + val filesToProcess = batch.bins.flatMap(_.files) + + txn.trackFilesRead(filesToProcess) + txn.trackReadPredicates(partitionPredicate) + + val maxThreads = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS) + val updates = ThreadUtils.parmap(batch.bins, "OptimizeJob", maxThreads) { partitionBinGroup => + runOptimizeBinJob(txn, partitionBinGroup.partitionValues, partitionBinGroup.files, + maxFileSize) + }.flatten + + val addedFiles = updates.collect { case a: AddFile => a } + val removedFiles = updates.collect { case r: RemoveFile => r } + val removedDVs = filesToProcess.filter(_.deletionVector != null).map(_.deletionVector).toSeq + if (addedFiles.size > 0) { + val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs) + commitAndRetry(txn, getOperation(), updates, metrics) { newTxn => + val newPartitionSchema = newTxn.metadata.partitionSchema + val candidateSetOld = filesToProcess.map(_.path).toSet + // We specifically don't list the files through the transaction since we are potentially + // only processing a subset of them below. If the transaction is still valid, we will + // register the files and predicate below + val candidateSetNew = + newTxn.snapshot.filesForScan(partitionPredicate).files.map(_.path).toSet + + // As long as all of the files that we compacted are still part of the table, + // and the partitioning has not changed it is valid to continue to try + // and commit this checkpoint. + if (candidateSetOld.subsetOf(candidateSetNew) && partitionSchema == newPartitionSchema) { + // Make sure the files we are processing are registered with the transaction + newTxn.trackFilesRead(filesToProcess) + newTxn.trackReadPredicates(partitionPredicate) + true + } else { + val deleted = candidateSetOld -- candidateSetNew + logWarning(s"The following compacted files were deleted " + + s"during checkpoint ${deleted.mkString(",")}. Aborting the compaction.") + false + } + } } + (addedFiles, removedFiles, removedDVs) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala index dc84e606440..884099a5b50 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala @@ -89,7 +89,7 @@ trait OptimizeTableStrategy { def updateOptimizeStats( optimizeStats: OptimizeStats, removedFiles: Seq[RemoveFile], - bins: Seq[(Map[String, String], Seq[AddFile])]): Unit + bins: Seq[Bin]): Unit } object OptimizeTableStrategy { @@ -151,7 +151,7 @@ case class CompactionStrategy( override def updateOptimizeStats( optimizeStats: OptimizeStats, removedFiles: Seq[RemoveFile], - bins: Seq[(Map[String, String], Seq[AddFile])]): Unit = {} + bins: Seq[Bin]): Unit = {} } /** Implements ZOrder strategy */ @@ -171,7 +171,7 @@ case class ZOrderStrategy( override def updateOptimizeStats( optimizeStats: OptimizeStats, removedFiles: Seq[RemoveFile], - bins: Seq[(Map[String, String], Seq[AddFile])]): Unit = { + bins: Seq[Bin]): Unit = { val inputFileStats = ZOrderFileStats(removedFiles.size, removedFiles.map(_.size.getOrElse(0L)).sum) optimizeStats.zOrderStats = Some(ZOrderStats( @@ -227,7 +227,7 @@ case class ClusteringStrategy( override def updateOptimizeStats( optimizeStats: OptimizeStats, removedFiles: Seq[RemoveFile], - bins: Seq[(Map[String, String], Seq[AddFile])]): Unit = { + bins: Seq[Bin]): Unit = { clusteringStatsCollector.numOutputZCubes = bins.size optimizeStats.clusteringStats = Option(clusteringStatsCollector.getClusteringStats) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/OptimizeStats.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/OptimizeStats.scala index e21733db8eb..9aa39a3bf99 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/OptimizeStats.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/OptimizeStats.scala @@ -30,6 +30,7 @@ case class OptimizeStats( var numPartitionsOptimized: Long = 0, var zOrderStats: Option[ZOrderStats] = None, var clusteringStats: Option[ClusteringStats] = None, + var numBins: Long = 0, var numBatches: Long = 0, var totalConsideredFiles: Long = 0, var totalFilesSkipped: Long = 0, @@ -54,6 +55,7 @@ case class OptimizeStats( partitionsOptimized = numPartitionsOptimized, zOrderStats = zOrderStats, clusteringStats = clusteringStats, + numBins = numBins, numBatches = numBatches, totalConsideredFiles = totalConsideredFiles, totalFilesSkipped = totalFilesSkipped, @@ -200,6 +202,7 @@ object FileSizeStatsWithHistogram { * @param partitionsOptimized Number of partitions optimized * @param zOrderStats Z-Order stats * @param clusteringStats Clustering stats + * @param numBins Number of bins * @param numBatches Number of batches * @param totalConsideredFiles Number of files considered for the Optimize operation. * @param totalFilesSkipped Number of files that are skipped from being Optimized. @@ -227,6 +230,7 @@ case class OptimizeMetrics( partitionsOptimized: Long = 0, zOrderStats: Option[ZOrderStats] = None, clusteringStats: Option[ClusteringStats] = None, + numBins: Long, numBatches: Long, totalConsideredFiles: Long, totalFilesSkipped: Long = 0, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala index 72404d2ea77..44cecac5e7e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala @@ -195,15 +195,14 @@ trait AutoCompactBase extends PostCommitHook with DeltaLogging { .getOrElse(maxFileSize / 2)) val maxFileSizeOpt = Some(maxFileSize) recordDeltaOperation(deltaLog, s"$opType.execute") { - val txn = deltaLog.startTransaction(catalogTable) val optimizeContext = DeltaOptimizeContext( reorg = None, minFileSizeOpt, maxFileSizeOpt, maxDeletedRowsRatio = maxDeletedRowsRatio ) - val rows = new OptimizeExecutor(spark, txn, partitionPredicates, Seq(), true, optimizeContext) - .optimize() + val rows = new OptimizeExecutor(spark, deltaLog.update(), catalogTable, partitionPredicates, + zOrderByColumns = Seq(), isAutoCompact = true, optimizeContext).optimize() val metrics = rows.map(_.getAs[OptimizeMetrics](1)) recordDeltaEvent(deltaLog, s"$opType.execute.metrics", data = metrics.head) metrics diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 526b9f39d05..56d7d0c06ea 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1226,6 +1226,19 @@ trait DeltaSQLConfBase { .checkValue(_ > 0, "'optimize.maxThreads' must be positive.") .createWithDefault(15) + val DELTA_OPTIMIZE_BATCH_SIZE = + buildConf("optimize.batchSize") + .internal() + .doc( + """ + |The size of a batch within an OPTIMIZE JOB. After a batch is complete, its + | progress will be committed to the transaction log, allowing for incremental + | progress. + |""".stripMargin) + .bytesConf(ByteUnit.BYTE) + .checkValue(_ > 0, "batchSize has to be positive") + .createOptional + val DELTA_OPTIMIZE_REPARTITION_ENABLED = buildConf("optimize.repartition.enabled") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala index 87f35ff0cc9..9c4a54275ce 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala @@ -181,7 +181,7 @@ class AutoCompactSuite extends assert(metricsLog.totalConsideredFiles === 93) assert(metricsLog.numFilesAdded == 3) assert(metricsLog.numFilesRemoved == 93) - assert(metricsLog.numBatches === 3) + assert(metricsLog.numBins === 3) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/clustering/ClusteredTableClusteringSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/clustering/ClusteredTableClusteringSuite.scala index 362dd40b539..5ea700241c0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/clustering/ClusteredTableClusteringSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/clustering/ClusteredTableClusteringSuite.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.delta.skipping.ClusteredTableTestUtils import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.SparkFunSuite @@ -103,4 +104,56 @@ class ClusteredTableClusteringSuite extends SparkFunSuite } } } + + test("optimize clustered table with batching") { + Seq(("1", 2), ("1g", 1)).foreach { case (batchSize, optimizeCommits) => + withClusteredTable( + table = table, + schema = "col1 int, col2 int", + clusterBy = "col1, col2") { + addFiles(table, numFiles = 4) + val files0 = getFiles(table) + assert(files0.size === 4) + assertNotClustered(files0) + + val totalSize = files0.toSeq.map(_.size).sum + val halfSize = totalSize / 2 + + withSQLConf( + DeltaSQLConf.DELTA_OPTIMIZE_BATCH_SIZE.key -> batchSize, + DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE.key -> halfSize.toString, + DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_TARGET_CUBE_SIZE.key -> halfSize.toString) { + // Optimize should create 2 cubes, which will be in separate batches if the batch size + // is small enough + runOptimize(table) { metrics => + assert(metrics.numFilesRemoved == 4) + assert(metrics.numFilesAdded == 2) + } + + val files1 = getFiles(table) + assert(files1.size == 2) + assertClustered(files1) + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) + + val commits = deltaLog.history.getHistory(None) + assert(commits.filter(_.operation == "OPTIMIZE").length == optimizeCommits) + } + } + } + } + + test("optimize clustered table with batching on an empty table") { + withClusteredTable( + table = table, + schema = "col1 int, col2 int", + clusterBy = "col1, col2") { + withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_BATCH_SIZE.key -> "1g") { + runOptimize(table) { metrics => + assert(metrics.numFilesRemoved == 0) + assert(metrics.numFilesAdded == 0) + } + } + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala index 63c551326c9..0fa2cb8641d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.functions.lit import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.LongType /** * Base class containing tests for Delta table Optimize (file compaction) @@ -536,6 +537,93 @@ trait OptimizeCompactionSuiteBase extends QueryTest } } + def optimizeWithBatching( + batchSize: String, + expectedCommits: Int, + condition: Option[String], + partitionFileCount: Map[String, Int]): Unit = { + withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_BATCH_SIZE.key -> batchSize) { + withTempDir { tempDir => + def writeData(count: Int): Unit = { + spark.range(count).select('id, 'id % 5 as "part") + .coalesce(1) + .write + .partitionBy("part") + .format("delta") + .mode("append") + .save(tempDir.getAbsolutePath) + } + + writeData(10) + writeData(100) + + val data = spark.read.format("delta").load(tempDir.getAbsolutePath()).collect() + + executeOptimizePath(tempDir.getAbsolutePath, condition) + + val df = spark.read.format("delta").load(tempDir.getAbsolutePath) + checkAnswer(df, data) + + val deltaLog = loadDeltaLog(tempDir.getAbsolutePath) + + val commits = deltaLog.history.getHistory(None) + assert(commits.filter(_.operation == "OPTIMIZE").length == expectedCommits) + + val files = groupInputFilesByPartition(df.inputFiles, deltaLog) + for ((part, fileCount) <- partitionFileCount) { + assert(files(("part", part)).length == fileCount) + } + } + } + } + + test("optimize command with batching") { + // Batch size of 1 byte means each bin will run in its own batch, and lead to 5 batches, + // one for each partition. + Seq(("1", 5), ("1g", 1)).foreach { case (batchSize, optimizeCommits) => + // All partitions should be one file after optimizing + val partitionFileCount = (0 to 4).map(_.toString -> 1).toMap + + optimizeWithBatching(batchSize, optimizeCommits, None, partitionFileCount) + } + } + + test("optimize command with where clause and batching") { + // Batch size of 1 byte means each bin will run in its own batch, and lead to 2 batches + // for the two partitions we are optimizing. + Seq(("1", 2), ("1g", 1)).foreach { case (batchSize, optimizeCommits) => + // First two partitions should have 1 file, last 3 should have two + val partitionFileCount = Map( + "0" -> 1, + "1" -> 1, + "2" -> 2, + "3" -> 2, + "4" -> 2 + ) + val files = optimizeWithBatching(batchSize, optimizeCommits, Some("part <= 1"), + partitionFileCount) + } + } + + test("optimize an empty table with batching") { + // Batch size of 1 byte means each bin will run in its own batch + withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_BATCH_SIZE.key -> "1") { + withTempDir { tempDir => + DeltaTable.create(spark) + .location(tempDir.getAbsolutePath()) + .addColumn("id", LongType) + .addColumn("part", LongType) + .partitionedBy("part") + .execute() + + // Just make sure it succeeds + executeOptimizePath(tempDir.getAbsolutePath) + + assert(spark.read.format("delta").load(tempDir.getAbsolutePath()).count() == 0) + } + } + } + /** * Utility method to append the given data to the Delta table located at the given path. * Optionally partitions the data. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeMetricsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeMetricsSuite.scala index c1d3590e3cb..f0ffde993b1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeMetricsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeMetricsSuite.scala @@ -128,6 +128,7 @@ trait OptimizeMetricsSuiteBase extends QueryTest StructField("partitionsOptimized", LongType, nullable = false), StructField("zOrderStats", zOrderStatsSchema, nullable = true), StructField("clusteringStats", clusteringStatsSchema, nullable = true), + StructField("numBins", LongType, nullable = false), StructField("numBatches", LongType, nullable = false), StructField("totalConsideredFiles", LongType, nullable = false), StructField("totalFilesSkipped", LongType, nullable = false), @@ -218,7 +219,8 @@ trait OptimizeMetricsSuiteBase extends QueryTest filesRemoved = FileSizeStats().toFileSizeMetrics, partitionsOptimized = 0, zOrderStats = None, - numBatches = 0, + numBins = 0, + numBatches = 1, totalConsideredFiles = 1, totalFilesSkipped = 1, preserveInsertionOrder = preserveInsertionOrder, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeZOrderSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeZOrderSuite.scala index 2e56f7b67d0..7d5ed9a6720 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeZOrderSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeZOrderSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.functions.{col, floor, lit, max, struct} import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.LongType trait OptimizePartitionTableHelper extends QueryTest { def testPartition(str: String)(testFun: => Any): Unit = { @@ -193,6 +194,93 @@ trait OptimizeZOrderSuiteBase extends OptimizePartitionTableHelper } } + def optimizeWithBatching( + batchSize: String, + expectedCommits: Int, + condition: Option[String], + partitionFileCount: Map[String, Int]): Unit = { + withSQLConf(DELTA_OPTIMIZE_BATCH_SIZE.key -> batchSize) { + withTempDir { tempDir => + def writeData(count: Int): Unit = { + spark.range(count).select('id, 'id % 5 as "part") + .coalesce(1) + .write + .partitionBy("part") + .format("delta") + .mode("append") + .save(tempDir.getAbsolutePath) + } + + writeData(10) + writeData(100) + + val data = spark.read.format("delta").load(tempDir.getAbsolutePath()).collect() + + executeOptimizePath(tempDir.getAbsolutePath, Seq("id"), condition) + + val df = spark.read.format("delta").load(tempDir.getAbsolutePath) + checkAnswer(df, data) + + val deltaLog = loadDeltaLog(tempDir.getAbsolutePath) + + val commits = deltaLog.history.getHistory(None) + assert(commits.filter(_.operation == "OPTIMIZE").length == expectedCommits) + + val files = groupInputFilesByPartition(df.inputFiles, deltaLog) + for ((part, fileCount) <- partitionFileCount) { + assert(files(("part", part)).length == fileCount) + } + } + } + } + + test("optimize command with batching") { + // Batch size of 1 byte means each bin will run in its own batch, and lead to 5 batches, + // one for each partition. + Seq(("1", 5), ("1g", 1)).foreach { case (batchSize, optimizeCommits) => + // All partitions should be one file after optimizing + val partitionFileCount = (0 to 4).map(_.toString -> 1).toMap + + optimizeWithBatching(batchSize, optimizeCommits, None, partitionFileCount) + } + } + + test("optimize command with where clause and batching") { + // Batch size of 1 byte means each bin will run in its own batch, and lead to 2 batches + // for the two partitions we are optimizing. + Seq(("1", 2), ("1g", 1)).foreach { case (batchSize, optimizeCommits) => + // First two partitions should have 1 file, last 3 should have two + val partitionFileCount = Map( + "0" -> 1, + "1" -> 1, + "2" -> 2, + "3" -> 2, + "4" -> 2 + ) + val files = optimizeWithBatching(batchSize, optimizeCommits, Some("part <= 1"), + partitionFileCount) + } + } + + test("optimize an empty table with batching") { + // Batch size of 1 byte means each bin will run in its own batch + withSQLConf(DELTA_OPTIMIZE_BATCH_SIZE.key -> "1") { + withTempDir { tempDir => + DeltaTable.create(spark) + .location(tempDir.getAbsolutePath()) + .addColumn("id", LongType) + .addColumn("part", LongType) + .partitionedBy("part") + .execute() + + // Just make sure it succeeds + executeOptimizePath(tempDir.getAbsolutePath, Seq("id")) + + assert(spark.read.format("delta").load(tempDir.getAbsolutePath()).count() == 0) + } + } + } + statsTest("optimize command: interleaving") { def statsDF(deltaLog: DeltaLog): DataFrame = { val (c1, c2, c3) = ("c1".phy(deltaLog), "c2".phy(deltaLog), "c3".phy(deltaLog))