Skip to content

Commit

Permalink
Improve CDF reading speed by scanning in batch
Browse files Browse the repository at this point in the history
This PR improves CDF reading speed and reduces the number of tasks by creating a single scan for all CDF files from all versions(*). Compared to the previous approach (which creates a scan for each version), the new method significantly reduces the number of cloud requests and puts less stress on the Spark scheduler.

(*) Due to a limitation in how we broadcast DVs, the combining will happen for files that have only one associated DV across the entire requested version range.

GitOrigin-RevId: faf9a5ce062914ee182ff86d5f5696aeb62d1421
  • Loading branch information
xupefei authored and tdas committed Aug 17, 2023
1 parent 7e87192 commit d19e989
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -706,30 +706,28 @@ trait CDCReaderImpl extends DeltaLogging {
.append(removeFilesMap(removeKey))
}

// Convert maps back into Seq[CDCDataSpec] and feed it into a single scan. This will greatly
// reduce the number of tasks.
val finalAddFilesSpecs = buildCDCDataSpecSeq(finalAddFiles, versionToCommitInfo)
val finalRemoveFilesSpecs = buildCDCDataSpecSeq(finalRemoveFiles, versionToCommitInfo)

val dfAddsAndRemoves = ListBuffer[DataFrame]()

finalAddFiles.foreach { case (tableVersion, addFiles) =>
val commitInfo = versionToCommitInfo.get(tableVersion.version)
if (finalAddFilesSpecs.nonEmpty) {
dfAddsAndRemoves.append(
scanIndex(
spark,
new CdcAddFileIndex(
spark,
Seq(new CDCDataSpec(tableVersion, addFiles.toSeq, commitInfo)),
deltaLog,
deltaLog.dataPath,
snapshot),
new CdcAddFileIndex(spark, finalAddFilesSpecs, deltaLog, deltaLog.dataPath, snapshot),
isStreaming))
}

finalRemoveFiles.foreach { case (tableVersion, removeFiles) =>
val commitInfo = versionToCommitInfo.get(tableVersion.version)
if (finalRemoveFilesSpecs.nonEmpty) {
dfAddsAndRemoves.append(
scanIndex(
spark,
new TahoeRemoveFileIndex(
spark,
Seq(new CDCDataSpec(tableVersion, removeFiles.toSeq, commitInfo)),
finalRemoveFilesSpecs,
deltaLog,
deltaLog.dataPath,
snapshot),
Expand Down Expand Up @@ -778,6 +776,9 @@ trait CDCReaderImpl extends DeltaLogging {
}
}

// We have to build one scan for each version because DVs attached to actions will be
// broadcasted in [[ScanWithDeletionVectors.createBroadcastDVMap]] which is not version-aware.
// Here, one file can have different row index filters in different versions.
val dfs = ListBuffer[DataFrame]()
// Scan for masked rows as change_type = "insert",
// see explanation in [[generateFileActionsWithInlineDv]].
Expand All @@ -794,7 +795,6 @@ trait CDCReaderImpl extends DeltaLogging {
snapshot,
rowIndexFilters =
Some(fileActionsToIfNotContainedRowIndexFilters(addFiles.toSeq))),

isStreaming))
}

Expand All @@ -813,7 +813,6 @@ trait CDCReaderImpl extends DeltaLogging {
snapshot,
rowIndexFilters =
Some(fileActionsToIfNotContainedRowIndexFilters(removeFiles.toSeq))),

isStreaming))
}

Expand Down Expand Up @@ -1011,6 +1010,14 @@ trait CDCReaderImpl extends DeltaLogging {
leftCopy
}

private def buildCDCDataSpecSeq[T <: FileAction](
actionsByVersion: MutableMap[TableVersion, ListBuffer[T]],
versionToCommitInfo: MutableMap[Long, CommitInfo]
): Seq[CDCDataSpec[T]] = actionsByVersion.map { case (fileVersion, addFiles) =>
val commitInfo = versionToCommitInfo.get(fileVersion.version)
new CDCDataSpec(fileVersion, addFiles.toSeq, commitInfo)
}.toSeq

/**
* Represents the changes between some start and end version of a Delta table
* @param fileChangeDf contains all of the file changes (AddFile, RemoveFile, AddCDCFile)
Expand Down

0 comments on commit d19e989

Please # to comment.