Skip to content

Commit

Permalink
Fix streaming when the same file occurs with different DVs in the sam…
Browse files Browse the repository at this point in the history
…e batch

## Description

There was an edge case in streaming with deletion vectors in the source, where in `ignoreChanges`-mode it could happen that if the same file occurred with different DVs in the same batch (or both with a DV and without a DV), then we would read the file with the wrong DV, since we broadcast the DVs to the scans by data file path.

This PR fixes this issue, by reading files from different versions in different scans and then taking the union of the result to build the final `DataFrame` for the batch.

Added new tests for having 2 DML commands (DELETE->DELETE and DELETE->INSERT) in the same batch for all change modi.

## Does this PR introduce _any_ user-facing changes?

No.

Closes #1899

Signed-off-by: larsk-db <lars.kroll@databricks.com>
GitOrigin-RevId: 43a2d479832ba9b4be7b888c0a633e725729744a
  • Loading branch information
larsk-db authored and vkorukanti committed Aug 11, 2023
1 parent 1ebea3d commit d36623f
Show file tree
Hide file tree
Showing 2 changed files with 453 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ package org.apache.spark.sql.delta.sources
import java.io.FileNotFoundException
import java.sql.Timestamp

import scala.collection.mutable
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import scala.util.matching.Regex

import org.apache.spark.sql.delta._
Expand Down Expand Up @@ -334,16 +332,40 @@ trait DeltaSourceBase extends Source
* @param indexedFiles actions iterator from which to generate the DataFrame.
*/
protected def createDataFrame(indexedFiles: Iterator[IndexedFile]): DataFrame = {
val addFilesList = indexedFiles
.map(_.getFileAction)
.filter(_.isInstanceOf[AddFile])
.asInstanceOf[Iterator[AddFile]].toArray

deltaLog.createDataFrame(
readSnapshotDescriptor,
addFilesList,
isStreaming = true
)
val addFiles = indexedFiles
.filter(_.getFileAction.isInstanceOf[AddFile])
.toSeq
val hasDeletionVectors =
addFiles.exists(_.getFileAction.asInstanceOf[AddFile].deletionVector != null)
if (hasDeletionVectors) {
// Read AddFiles from different versions in different scans.
// This avoids an issue where we might read the same file with different deletion vectors in
// the same scan, which we cannot support as long we broadcast a map of DVs for lookup.
// This code can be removed once we can pass the DVs into the scan directly together with the
// AddFile/PartitionedFile entry.
addFiles
.groupBy(_.version)
.values
.map { addFilesList =>
deltaLog.createDataFrame(
readSnapshotDescriptor,
addFilesList.map(_.getFileAction.asInstanceOf[AddFile]),
isStreaming = true)
}
.reduceOption(_ union _)
.getOrElse {
// If we filtered out all the values before the groupBy, just return an empty DataFrame.
deltaLog.createDataFrame(
readSnapshotDescriptor,
Seq.empty[AddFile],
isStreaming = true)
}
} else {
deltaLog.createDataFrame(
readSnapshotDescriptor,
addFiles.map(_.getFileAction.asInstanceOf[AddFile]),
isStreaming = true)
}
}

/**
Expand Down
Loading

0 comments on commit d36623f

Please # to comment.