Skip to content

Commit

Permalink
Code refactor
Browse files Browse the repository at this point in the history
refactor code inside StatisticsCollection.scala.

GitOrigin-RevId: d59a7df828db4df6eafec94d8622176b3fc8bb49
  • Loading branch information
kamcheungting-db authored and vkorukanti committed Sep 25, 2023
1 parent 4eb177e commit 4622db6
Showing 1 changed file with 29 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.language.existentials
import org.apache.spark.sql.delta.{Checkpoints, DeletionVectorsTableFeature, DeltaColumnMapping, DeltaColumnMappingMode, DeltaConfigs, DeltaErrors, DeltaIllegalArgumentException, DeltaLog, DeltaUDF, NoMapping}
import org.apache.spark.sql.delta.DeltaColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY
import org.apache.spark.sql.delta.DeltaOperations.ComputeStats
import org.apache.spark.sql.delta.OptimisticTransaction
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol}
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
import org.apache.spark.sql.delta.commands.DeltaCommand
Expand Down Expand Up @@ -702,6 +703,33 @@ object StatisticsCollection extends DeltaCommand {
(StructType(fields.toSeq), accCnt)
}

/**
* Compute the AddFile entries with delta statistics entries by aggregating the data skipping
* columns of each parquet file.
*/
private def computeNewAddFiles(
deltaLog: DeltaLog,
txn: OptimisticTransaction,
files: Seq[AddFile]): Array[AddFile] = {
val dataPath = deltaLog.dataPath
val pathToAddFileMap = generateCandidateFileMap(dataPath, files)
val persistentDVsReadable = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)
// Throw error when the table contains DVs, because existing method of stats
// recomputation doesn't work on tables with DVs. It needs to take into consideration of
// DV files (TODO).
if (persistentDVsReadable) {
throw DeltaErrors.statsRecomputeNotSupportedOnDvTables()
}
val fileDataFrame = deltaLog
.createDataFrame(txn.snapshot, addFiles = files, isStreaming = false)
.withColumn("path", col("_metadata.file_path"))
val newStats = fileDataFrame.groupBy(col("path")).agg(to_json(txn.statsCollector))
newStats.collect().map { r =>
val add = getTouchedFile(dataPath, r.getString(0), pathToAddFileMap)
add.copy(dataChange = false, stats = r.getString(1))
}
}

/**
* Recomputes statistics for a Delta table. This can be used to compute stats if they were never
* collected or to recompute corrupted statistics.
Expand All @@ -717,38 +745,9 @@ object StatisticsCollection extends DeltaCommand {
fileFilter: AddFile => Boolean = af => true): Unit = {
val txn = deltaLog.startTransaction()
verifyPartitionPredicates(spark, txn.metadata.partitionColumns, predicates)

// Save the current AddFiles that match the predicates so we can update their stats
val files = txn.filterFiles(predicates).filter(fileFilter)
val pathToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, files)
val persistentDVsReadable = DeletionVectorUtils.deletionVectorsReadable(txn.snapshot)

// Use the stats collector to recompute stats
val dataPath = deltaLog.dataPath
val snapshot = txn.snapshot
val newAddFiles = {
// Throw error when the table contains DVs, because existing method of stats
// recomputation doesn't work on tables with DVs. It needs to take into consideration of
// DV files (TODO).
if (persistentDVsReadable) {
throw DeltaErrors.statsRecomputeNotSupportedOnDvTables()
}
{
val fileDataFrame = deltaLog
.createDataFrame(txn.snapshot, addFiles = files, isStreaming = false)
.withColumn("path", col("_metadata.file_path"))
val newStats =
{
fileDataFrame.groupBy(col("path")).agg(to_json(txn.statsCollector))
}
// Use the new stats to update the AddFiles and commit back to the DeltaLog
newStats.collect().map { r =>
val add = getTouchedFile(dataPath, r.getString(0), pathToAddFileMap)
add.copy(dataChange = false, stats = r.getString(1))
}
}
}

val newAddFiles = computeNewAddFiles(deltaLog, txn, files)
txn.commit(newAddFiles, ComputeStats(predicates))
}

Expand Down

0 comments on commit 4622db6

Please # to comment.