From 4622db66fdab41a7e1729a58ab7daa8038f70da1 Mon Sep 17 00:00:00 2001 From: Kam Cheung Ting Date: Fri, 22 Sep 2023 16:00:05 -0700 Subject: [PATCH] Code refactor refactor code inside StatisticsCollection.scala. GitOrigin-RevId: d59a7df828db4df6eafec94d8622176b3fc8bb49 --- .../delta/stats/StatisticsCollection.scala | 59 +++++++++---------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala index c6e7d4b9c70..1321647311b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala @@ -26,6 +26,7 @@ import scala.language.existentials import org.apache.spark.sql.delta.{Checkpoints, DeletionVectorsTableFeature, DeltaColumnMapping, DeltaColumnMappingMode, DeltaConfigs, DeltaErrors, DeltaIllegalArgumentException, DeltaLog, DeltaUDF, NoMapping} import org.apache.spark.sql.delta.DeltaColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY import org.apache.spark.sql.delta.DeltaOperations.ComputeStats +import org.apache.spark.sql.delta.OptimisticTransaction import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol} import org.apache.spark.sql.delta.commands.DeletionVectorUtils import org.apache.spark.sql.delta.commands.DeltaCommand @@ -702,6 +703,33 @@ object StatisticsCollection extends DeltaCommand { (StructType(fields.toSeq), accCnt) } + /** + * Compute the AddFile entries with delta statistics entries by aggregating the data skipping + * columns of each parquet file. + */ + private def computeNewAddFiles( + deltaLog: DeltaLog, + txn: OptimisticTransaction, + files: Seq[AddFile]): Array[AddFile] = { + val dataPath = deltaLog.dataPath + val pathToAddFileMap = generateCandidateFileMap(dataPath, files) + val persistentDVsReadable = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot) + // Throw error when the table contains DVs, because existing method of stats + // recomputation doesn't work on tables with DVs. It needs to take into consideration of + // DV files (TODO). + if (persistentDVsReadable) { + throw DeltaErrors.statsRecomputeNotSupportedOnDvTables() + } + val fileDataFrame = deltaLog + .createDataFrame(txn.snapshot, addFiles = files, isStreaming = false) + .withColumn("path", col("_metadata.file_path")) + val newStats = fileDataFrame.groupBy(col("path")).agg(to_json(txn.statsCollector)) + newStats.collect().map { r => + val add = getTouchedFile(dataPath, r.getString(0), pathToAddFileMap) + add.copy(dataChange = false, stats = r.getString(1)) + } + } + /** * Recomputes statistics for a Delta table. This can be used to compute stats if they were never * collected or to recompute corrupted statistics. @@ -717,38 +745,9 @@ object StatisticsCollection extends DeltaCommand { fileFilter: AddFile => Boolean = af => true): Unit = { val txn = deltaLog.startTransaction() verifyPartitionPredicates(spark, txn.metadata.partitionColumns, predicates) - // Save the current AddFiles that match the predicates so we can update their stats val files = txn.filterFiles(predicates).filter(fileFilter) - val pathToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, files) - val persistentDVsReadable = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot) - - // Use the stats collector to recompute stats - val dataPath = deltaLog.dataPath - val snapshot = txn.snapshot - val newAddFiles = { - // Throw error when the table contains DVs, because existing method of stats - // recomputation doesn't work on tables with DVs. It needs to take into consideration of - // DV files (TODO). - if (persistentDVsReadable) { - throw DeltaErrors.statsRecomputeNotSupportedOnDvTables() - } - { - val fileDataFrame = deltaLog - .createDataFrame(txn.snapshot, addFiles = files, isStreaming = false) - .withColumn("path", col("_metadata.file_path")) - val newStats = - { - fileDataFrame.groupBy(col("path")).agg(to_json(txn.statsCollector)) - } - // Use the new stats to update the AddFiles and commit back to the DeltaLog - newStats.collect().map { r => - val add = getTouchedFile(dataPath, r.getString(0), pathToAddFileMap) - add.copy(dataChange = false, stats = r.getString(1)) - } - } - } - + val newAddFiles = computeNewAddFiles(deltaLog, txn, files) txn.commit(newAddFiles, ComputeStats(predicates)) }