diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index 12f8bd4c563..9d399c9379b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -706,30 +706,28 @@ trait CDCReaderImpl extends DeltaLogging { .append(removeFilesMap(removeKey)) } + // Convert maps back into Seq[CDCDataSpec] and feed it into a single scan. This will greatly + // reduce the number of tasks. + val finalAddFilesSpecs = buildCDCDataSpecSeq(finalAddFiles, versionToCommitInfo) + val finalRemoveFilesSpecs = buildCDCDataSpecSeq(finalRemoveFiles, versionToCommitInfo) + val dfAddsAndRemoves = ListBuffer[DataFrame]() - finalAddFiles.foreach { case (tableVersion, addFiles) => - val commitInfo = versionToCommitInfo.get(tableVersion.version) + if (finalAddFilesSpecs.nonEmpty) { dfAddsAndRemoves.append( scanIndex( spark, - new CdcAddFileIndex( - spark, - Seq(new CDCDataSpec(tableVersion, addFiles.toSeq, commitInfo)), - deltaLog, - deltaLog.dataPath, - snapshot), + new CdcAddFileIndex(spark, finalAddFilesSpecs, deltaLog, deltaLog.dataPath, snapshot), isStreaming)) } - finalRemoveFiles.foreach { case (tableVersion, removeFiles) => - val commitInfo = versionToCommitInfo.get(tableVersion.version) + if (finalRemoveFilesSpecs.nonEmpty) { dfAddsAndRemoves.append( scanIndex( spark, new TahoeRemoveFileIndex( spark, - Seq(new CDCDataSpec(tableVersion, removeFiles.toSeq, commitInfo)), + finalRemoveFilesSpecs, deltaLog, deltaLog.dataPath, snapshot), @@ -778,6 +776,9 @@ trait CDCReaderImpl extends DeltaLogging { } } + // We have to build one scan for each version because DVs attached to actions will be + // broadcasted in [[ScanWithDeletionVectors.createBroadcastDVMap]] which is not version-aware. + // Here, one file can have different row index filters in different versions. val dfs = ListBuffer[DataFrame]() // Scan for masked rows as change_type = "insert", // see explanation in [[generateFileActionsWithInlineDv]]. @@ -794,7 +795,6 @@ trait CDCReaderImpl extends DeltaLogging { snapshot, rowIndexFilters = Some(fileActionsToIfNotContainedRowIndexFilters(addFiles.toSeq))), - isStreaming)) } @@ -813,7 +813,6 @@ trait CDCReaderImpl extends DeltaLogging { snapshot, rowIndexFilters = Some(fileActionsToIfNotContainedRowIndexFilters(removeFiles.toSeq))), - isStreaming)) } @@ -1011,6 +1010,14 @@ trait CDCReaderImpl extends DeltaLogging { leftCopy } + private def buildCDCDataSpecSeq[T <: FileAction]( + actionsByVersion: MutableMap[TableVersion, ListBuffer[T]], + versionToCommitInfo: MutableMap[Long, CommitInfo] + ): Seq[CDCDataSpec[T]] = actionsByVersion.map { case (fileVersion, addFiles) => + val commitInfo = versionToCommitInfo.get(fileVersion.version) + new CDCDataSpec(fileVersion, addFiles.toSeq, commitInfo) + }.toSeq + /** * Represents the changes between some start and end version of a Delta table * @param fileChangeDf contains all of the file changes (AddFile, RemoveFile, AddCDCFile)