Skip to content

Commit

Permalink
Fix Delta source initialization issue when using AvailableNow
Browse files Browse the repository at this point in the history
When using AvailableNow, here are the flows for Delta source:

- New query: prepareForTriggerAvailableNow, (latestOffset -> getBatch)*.
- Restarted query: prepareForTriggerAvailableNow, getBatch, (latestOffset -> getBatch)*.

When restarting a query, getBatch is required to be called first. Otherwise, previousOffset will not be set and latestOffset will assume it's a new query and return an incorrect offset.

Today, we call latestOffset inside prepareForTriggerAvailableNow, which causes the incorrect initialization for lastOffsetForTriggerAvailableNow because previousOffset is not set yet at this moment when restarting a query.

In this PR, we add isTriggerAvailableNow and set it to true in prepareForTriggerAvailableNow without initializing lastOffsetForTriggerAvailableNow, and make lastOffsetForTriggerAvailableNow initialization happen inside latestOffset (for new query) or getBatch (for restarted query) so that it can be initialized correctly. We add an internal flag spark.databricks.delta.streaming.availableNow.offsetInitializationFix.enabled to allow users switching back to the old behavior.

In addition, we also add a validation for previousOffset and currentOffset to make sure they never move backward. This would ensure we will not cause data duplication even if we have any bug in offset generation. spark.databricks.delta.streaming.offsetValidation.enabled is added to allow users turning off the check.

GitOrigin-RevId: a2684fbcdecf8f621fd3ce751f3e71fc96a17d7c
  • Loading branch information
zsxwing authored and allisonport-db committed Sep 30, 2022
1 parent ab0158d commit 29d3a09
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,24 @@ trait DeltaSQLConfBase {
.longConf
.createWithDefault(128L * 1024 * 1024) // 128MB

val STREAMING_OFFSET_VALIDATION =
buildConf("streaming.offsetValidation.enabled")
.internal()
.doc("Whether to validate whether delta streaming source generates a smaller offset and " +
"moves backward.")
.booleanConf
.createWithDefault(true)

val STREAMING_AVAILABLE_NOW_OFFSET_INITIALIZATION_FIX =
buildConf("streaming.availableNow.offsetInitializationFix.enabled")
.internal()
.doc(
"""Whether to enable the offset initializaion fix for AvailableNow.
|This is just a flag to provide the mitigation option if the fix introduces
|any bugs.""".stripMargin)
.booleanConf
.createWithDefault(true)

val LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS =
buildConf("loadFileSystemConfigsFromDataFrameOptions")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,48 @@ trait DeltaSourceBase extends Source
}
}

protected var lastOffsetForTriggerAvailableNow: DeltaSourceOffset = _
private val enableAvailableNowOffsetInitializationFix =
spark.sessionState.conf.getConf(DeltaSQLConf.STREAMING_AVAILABLE_NOW_OFFSET_INITIALIZATION_FIX)

/**
* When `AvailableNow` is used, this offset will be the upper bound where this run of the query
* will process up. We may run multiple micro batches, but the query will stop itself when it
* reaches this offset.
*/
protected var lastOffsetForTriggerAvailableNow: DeltaSourceOffset = null

private var isLastOffsetForTriggerAvailableNowInitialized = false

private var isTriggerAvailableNow = false

override def prepareForTriggerAvailableNow(): Unit = {
val offset = latestOffset(null, ReadLimit.allAvailable())
if (offset != null) {
lastOffsetForTriggerAvailableNow = DeltaSourceOffset(tableId, offset)
isTriggerAvailableNow = true
if (!enableAvailableNowOffsetInitializationFix) {
val offset = latestOffsetInternal(ReadLimit.allAvailable())
if (offset != null) {
lastOffsetForTriggerAvailableNow = DeltaSourceOffset(tableId, offset)
}
}
}

/**
* initialize the internal states for AvailableNow if this method is called first time after
* `prepareForTriggerAvailableNow`.
*/
protected def initForTriggerAvailableNowIfNeeded(): Unit = {
if (enableAvailableNowOffsetInitializationFix && isTriggerAvailableNow &&
!isLastOffsetForTriggerAvailableNowInitialized) {
isLastOffsetForTriggerAvailableNowInitialized = true
val offset = latestOffsetInternal(ReadLimit.allAvailable())
if (offset != null) {
lastOffsetForTriggerAvailableNow = DeltaSourceOffset(tableId, offset)
}
}
}

/** An internal `latestOffsetInternal` to get the latest offset. */
protected def latestOffsetInternal(limit: ReadLimit): streaming.Offset

protected def getFileChangesWithRateLimit(
fromVersion: Long,
fromIndex: Long,
Expand Down Expand Up @@ -420,6 +453,9 @@ case class DeltaSource(
extends DeltaSourceBase
with DeltaSourceCDCSupport {

private val shouldValidateOffsets =
spark.sessionState.conf.getConf(DeltaSQLConf.STREAMING_OFFSET_VALIDATION)

// Deprecated. Please use `ignoreDeletes` or `ignoreChanges` from now on.
private val ignoreFileDeletion = {
if (options.ignoreFileDeletion) {
Expand Down Expand Up @@ -587,7 +623,16 @@ case class DeltaSource(
new AdmissionLimits().toReadLimit
}

/**
* This should only be called by the engine. Call `latestOffsetInternal` instead if you need to
* get the latest offset.
*/
override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = {
initForTriggerAvailableNowIfNeeded()
latestOffsetInternal(limit)
}

override protected def latestOffsetInternal(limit: ReadLimit): streaming.Offset = {
val limits = AdmissionLimits(limit)

val currentOffset = if (previousOffset == null) {
Expand All @@ -596,6 +641,11 @@ case class DeltaSource(
getNextOffsetFromPreviousOffset(previousOffset, limits)
}
logDebug(s"previousOffset -> currentOffset: $previousOffset -> $currentOffset")
if (shouldValidateOffsets && previousOffset != null) {
currentOffset.foreach { current =>
DeltaSourceOffset.validateOffsets(previousOffset, DeltaSourceOffset(tableId, current))
}
}
currentOffset.orNull
}

Expand Down Expand Up @@ -717,6 +767,10 @@ case class DeltaSource(
override def getBatch(startOffsetOption: Option[Offset], end: Offset): DataFrame = {
val endOffset = DeltaSourceOffset(tableId, end)
previousOffset = endOffset // For recovery
// We need to initialize after `previousOffset` is set so that we can use `previousOffset` to
// know whether we are going to process files in a snapshot or a commit after restart. Even for
// the same table table, the indexes of a file in a snpshot and a commit are different.
initForTriggerAvailableNowIfNeeded()

val (startVersion,
startIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,27 @@ object DeltaSourceOffset {
case value: JValue => Some(value)
}
}

/**
* Validate offsets to make sure we always move forward. Moving backward may make the query
* re-process data and cause data duplication.
*/
def validateOffsets(previousOffset: DeltaSourceOffset, currentOffset: DeltaSourceOffset): Unit = {
if (previousOffset.isStartingVersion == false && currentOffset.isStartingVersion == true) {
throw new IllegalStateException(
s"Found invalid offsets: 'isStartingVersion' fliped incorrectly. " +
s"Previous: $previousOffset, Current: $currentOffset")
}
if (previousOffset.reservoirVersion > currentOffset.reservoirVersion) {
throw new IllegalStateException(
s"Found invalid offsets: 'reservoirVersion' moved back. " +
s"Previous: $previousOffset, Current: $currentOffset")
}
if (previousOffset.reservoirVersion == currentOffset.reservoirVersion &&
previousOffset.index > currentOffset.index) {
throw new IllegalStateException(
s"Found invalid offsets. 'index' moved back. " +
s"Previous: $previousOffset, Current: $currentOffset")
}
}
}
130 changes: 130 additions & 0 deletions core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
import java.io.{File, FileInputStream, OutputStream}
import java.net.URI
import java.util.UUID
import java.util.concurrent.TimeoutException

import scala.concurrent.duration._
import scala.language.implicitConversions
Expand Down Expand Up @@ -1881,6 +1882,135 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
}
}
}

test("DeltaSourceOffset.validateOffsets") {
DeltaSourceOffset.validateOffsets(
previousOffset = DeltaSourceOffset(
sourceVersion = 1,
reservoirId = "foo",
reservoirVersion = 4,
index = 10,
isStartingVersion = false),
currentOffset = DeltaSourceOffset(
sourceVersion = 1,
reservoirId = "foo",
reservoirVersion = 4,
index = 10,
isStartingVersion = false)
)
DeltaSourceOffset.validateOffsets(
previousOffset = DeltaSourceOffset(
sourceVersion = 1,
reservoirId = "foo",
reservoirVersion = 4,
index = 10,
isStartingVersion = false),
currentOffset = DeltaSourceOffset(
sourceVersion = 1,
reservoirId = "foo",
reservoirVersion = 5,
index = 1,
isStartingVersion = false)
)

assert(intercept[IllegalStateException] {
DeltaSourceOffset.validateOffsets(
previousOffset = DeltaSourceOffset(
sourceVersion = 1,
reservoirId = "foo",
reservoirVersion = 4,
index = 10,
isStartingVersion = false),
currentOffset = DeltaSourceOffset(
sourceVersion = 1,
reservoirId = "foo",
reservoirVersion = 4,
index = 10,
isStartingVersion = true)
)
}.getMessage.contains("Found invalid offsets: 'isStartingVersion' fliped incorrectly."))
assert(intercept[IllegalStateException] {
DeltaSourceOffset.validateOffsets(
previousOffset = DeltaSourceOffset(
sourceVersion = 1,
reservoirId = "foo",
reservoirVersion = 4,
index = 10,
isStartingVersion = false),
currentOffset = DeltaSourceOffset(
sourceVersion = 1,
reservoirId = "foo",
reservoirVersion = 1,
index = 10,
isStartingVersion = false)
)
}.getMessage.contains("Found invalid offsets: 'reservoirVersion' moved back."))
assert(intercept[IllegalStateException] {
DeltaSourceOffset.validateOffsets(
previousOffset = DeltaSourceOffset(
sourceVersion = 1,
reservoirId = "foo",
reservoirVersion = 4,
index = 10,
isStartingVersion = false),
currentOffset = DeltaSourceOffset(
sourceVersion = 1,
reservoirId = "foo",
reservoirVersion = 4,
index = 9,
isStartingVersion = false)
)
}.getMessage.contains("Found invalid offsets. 'index' moved back."))
}

test("ES-445863: delta source should not hang or reprocess data when using AvailableNow") {
withTempDirs { (inputDir, outputDir, checkpointDir) =>
def runQuery(): Unit = {
val q = spark.readStream
.format("delta")
.load(inputDir.getCanonicalPath)
// Require a partition filter. The max index of files matching the partition filter must
// be less than the number of files in the second commit.
.where("part = 0")
.writeStream
.format("delta")
.trigger(Trigger.AvailableNow)
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.start(outputDir.getCanonicalPath)
try {
if (!q.awaitTermination(60000)) {
throw new TimeoutException("the query didn't stop in 60 seconds")
}
} finally {
q.stop()
}
}

spark.range(0, 1)
.selectExpr("id", "id as part")
.repartition(10)
.write
.partitionBy("part")
.format("delta")
.mode("append")
.save(inputDir.getCanonicalPath)
runQuery()

spark.range(1, 10)
.selectExpr("id", "id as part")
.repartition(9)
.write
.partitionBy("part")
.format("delta")
.mode("append")
.save(inputDir.getCanonicalPath)
runQuery()

checkAnswer(
spark.read.format("delta").load(outputDir.getCanonicalPath),
Row(0, 0) :: Nil)
}
}
}

/**
Expand Down

0 comments on commit 29d3a09

Please # to comment.