From 29d3a09289f5d799714df448ea9818d2fb5fcdd7 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 30 Sep 2022 11:49:24 -0700 Subject: [PATCH] Fix Delta source initialization issue when using AvailableNow When using AvailableNow, here are the flows for Delta source: - New query: prepareForTriggerAvailableNow, (latestOffset -> getBatch)*. - Restarted query: prepareForTriggerAvailableNow, getBatch, (latestOffset -> getBatch)*. When restarting a query, getBatch is required to be called first. Otherwise, previousOffset will not be set and latestOffset will assume it's a new query and return an incorrect offset. Today, we call latestOffset inside prepareForTriggerAvailableNow, which causes the incorrect initialization for lastOffsetForTriggerAvailableNow because previousOffset is not set yet at this moment when restarting a query. In this PR, we add isTriggerAvailableNow and set it to true in prepareForTriggerAvailableNow without initializing lastOffsetForTriggerAvailableNow, and make lastOffsetForTriggerAvailableNow initialization happen inside latestOffset (for new query) or getBatch (for restarted query) so that it can be initialized correctly. We add an internal flag spark.databricks.delta.streaming.availableNow.offsetInitializationFix.enabled to allow users switching back to the old behavior. In addition, we also add a validation for previousOffset and currentOffset to make sure they never move backward. This would ensure we will not cause data duplication even if we have any bug in offset generation. spark.databricks.delta.streaming.offsetValidation.enabled is added to allow users turning off the check. GitOrigin-RevId: a2684fbcdecf8f621fd3ce751f3e71fc96a17d7c --- .../sql/delta/sources/DeltaSQLConf.scala | 18 +++ .../spark/sql/delta/sources/DeltaSource.scala | 62 ++++++++- .../sql/delta/sources/DeltaSourceOffset.scala | 23 ++++ .../spark/sql/delta/DeltaSourceSuite.scala | 130 ++++++++++++++++++ 4 files changed, 229 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index b8dc06673e0..916bdab9b2b 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -628,6 +628,24 @@ trait DeltaSQLConfBase { .longConf .createWithDefault(128L * 1024 * 1024) // 128MB + val STREAMING_OFFSET_VALIDATION = + buildConf("streaming.offsetValidation.enabled") + .internal() + .doc("Whether to validate whether delta streaming source generates a smaller offset and " + + "moves backward.") + .booleanConf + .createWithDefault(true) + + val STREAMING_AVAILABLE_NOW_OFFSET_INITIALIZATION_FIX = + buildConf("streaming.availableNow.offsetInitializationFix.enabled") + .internal() + .doc( + """Whether to enable the offset initializaion fix for AvailableNow. + |This is just a flag to provide the mitigation option if the fix introduces + |any bugs.""".stripMargin) + .booleanConf + .createWithDefault(true) + val LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS = buildConf("loadFileSystemConfigsFromDataFrameOptions") .internal() diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 51d955d040a..5ce7b737bf8 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -134,15 +134,48 @@ trait DeltaSourceBase extends Source } } - protected var lastOffsetForTriggerAvailableNow: DeltaSourceOffset = _ + private val enableAvailableNowOffsetInitializationFix = + spark.sessionState.conf.getConf(DeltaSQLConf.STREAMING_AVAILABLE_NOW_OFFSET_INITIALIZATION_FIX) + + /** + * When `AvailableNow` is used, this offset will be the upper bound where this run of the query + * will process up. We may run multiple micro batches, but the query will stop itself when it + * reaches this offset. + */ + protected var lastOffsetForTriggerAvailableNow: DeltaSourceOffset = null + + private var isLastOffsetForTriggerAvailableNowInitialized = false + + private var isTriggerAvailableNow = false override def prepareForTriggerAvailableNow(): Unit = { - val offset = latestOffset(null, ReadLimit.allAvailable()) - if (offset != null) { - lastOffsetForTriggerAvailableNow = DeltaSourceOffset(tableId, offset) + isTriggerAvailableNow = true + if (!enableAvailableNowOffsetInitializationFix) { + val offset = latestOffsetInternal(ReadLimit.allAvailable()) + if (offset != null) { + lastOffsetForTriggerAvailableNow = DeltaSourceOffset(tableId, offset) + } + } + } + + /** + * initialize the internal states for AvailableNow if this method is called first time after + * `prepareForTriggerAvailableNow`. + */ + protected def initForTriggerAvailableNowIfNeeded(): Unit = { + if (enableAvailableNowOffsetInitializationFix && isTriggerAvailableNow && + !isLastOffsetForTriggerAvailableNowInitialized) { + isLastOffsetForTriggerAvailableNowInitialized = true + val offset = latestOffsetInternal(ReadLimit.allAvailable()) + if (offset != null) { + lastOffsetForTriggerAvailableNow = DeltaSourceOffset(tableId, offset) + } } } + /** An internal `latestOffsetInternal` to get the latest offset. */ + protected def latestOffsetInternal(limit: ReadLimit): streaming.Offset + protected def getFileChangesWithRateLimit( fromVersion: Long, fromIndex: Long, @@ -420,6 +453,9 @@ case class DeltaSource( extends DeltaSourceBase with DeltaSourceCDCSupport { + private val shouldValidateOffsets = + spark.sessionState.conf.getConf(DeltaSQLConf.STREAMING_OFFSET_VALIDATION) + // Deprecated. Please use `ignoreDeletes` or `ignoreChanges` from now on. private val ignoreFileDeletion = { if (options.ignoreFileDeletion) { @@ -587,7 +623,16 @@ case class DeltaSource( new AdmissionLimits().toReadLimit } + /** + * This should only be called by the engine. Call `latestOffsetInternal` instead if you need to + * get the latest offset. + */ override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = { + initForTriggerAvailableNowIfNeeded() + latestOffsetInternal(limit) + } + + override protected def latestOffsetInternal(limit: ReadLimit): streaming.Offset = { val limits = AdmissionLimits(limit) val currentOffset = if (previousOffset == null) { @@ -596,6 +641,11 @@ case class DeltaSource( getNextOffsetFromPreviousOffset(previousOffset, limits) } logDebug(s"previousOffset -> currentOffset: $previousOffset -> $currentOffset") + if (shouldValidateOffsets && previousOffset != null) { + currentOffset.foreach { current => + DeltaSourceOffset.validateOffsets(previousOffset, DeltaSourceOffset(tableId, current)) + } + } currentOffset.orNull } @@ -717,6 +767,10 @@ case class DeltaSource( override def getBatch(startOffsetOption: Option[Offset], end: Offset): DataFrame = { val endOffset = DeltaSourceOffset(tableId, end) previousOffset = endOffset // For recovery + // We need to initialize after `previousOffset` is set so that we can use `previousOffset` to + // know whether we are going to process files in a snapshot or a commit after restart. Even for + // the same table table, the indexes of a file in a snpshot and a commit are different. + initForTriggerAvailableNowIfNeeded() val (startVersion, startIndex, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala index 0bb7a95085f..67bd6a4bb2c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala @@ -124,4 +124,27 @@ object DeltaSourceOffset { case value: JValue => Some(value) } } + + /** + * Validate offsets to make sure we always move forward. Moving backward may make the query + * re-process data and cause data duplication. + */ + def validateOffsets(previousOffset: DeltaSourceOffset, currentOffset: DeltaSourceOffset): Unit = { + if (previousOffset.isStartingVersion == false && currentOffset.isStartingVersion == true) { + throw new IllegalStateException( + s"Found invalid offsets: 'isStartingVersion' fliped incorrectly. " + + s"Previous: $previousOffset, Current: $currentOffset") + } + if (previousOffset.reservoirVersion > currentOffset.reservoirVersion) { + throw new IllegalStateException( + s"Found invalid offsets: 'reservoirVersion' moved back. " + + s"Previous: $previousOffset, Current: $currentOffset") + } + if (previousOffset.reservoirVersion == currentOffset.reservoirVersion && + previousOffset.index > currentOffset.index) { + throw new IllegalStateException( + s"Found invalid offsets. 'index' moved back. " + + s"Previous: $previousOffset, Current: $currentOffset") + } + } } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index 928baff0154..a02b106c339 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta import java.io.{File, FileInputStream, OutputStream} import java.net.URI import java.util.UUID +import java.util.concurrent.TimeoutException import scala.concurrent.duration._ import scala.language.implicitConversions @@ -1881,6 +1882,135 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase } } } + + test("DeltaSourceOffset.validateOffsets") { + DeltaSourceOffset.validateOffsets( + previousOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false), + currentOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false) + ) + DeltaSourceOffset.validateOffsets( + previousOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false), + currentOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 5, + index = 1, + isStartingVersion = false) + ) + + assert(intercept[IllegalStateException] { + DeltaSourceOffset.validateOffsets( + previousOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false), + currentOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = true) + ) + }.getMessage.contains("Found invalid offsets: 'isStartingVersion' fliped incorrectly.")) + assert(intercept[IllegalStateException] { + DeltaSourceOffset.validateOffsets( + previousOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false), + currentOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 1, + index = 10, + isStartingVersion = false) + ) + }.getMessage.contains("Found invalid offsets: 'reservoirVersion' moved back.")) + assert(intercept[IllegalStateException] { + DeltaSourceOffset.validateOffsets( + previousOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false), + currentOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 9, + isStartingVersion = false) + ) + }.getMessage.contains("Found invalid offsets. 'index' moved back.")) + } + + test("ES-445863: delta source should not hang or reprocess data when using AvailableNow") { + withTempDirs { (inputDir, outputDir, checkpointDir) => + def runQuery(): Unit = { + val q = spark.readStream + .format("delta") + .load(inputDir.getCanonicalPath) + // Require a partition filter. The max index of files matching the partition filter must + // be less than the number of files in the second commit. + .where("part = 0") + .writeStream + .format("delta") + .trigger(Trigger.AvailableNow) + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + try { + if (!q.awaitTermination(60000)) { + throw new TimeoutException("the query didn't stop in 60 seconds") + } + } finally { + q.stop() + } + } + + spark.range(0, 1) + .selectExpr("id", "id as part") + .repartition(10) + .write + .partitionBy("part") + .format("delta") + .mode("append") + .save(inputDir.getCanonicalPath) + runQuery() + + spark.range(1, 10) + .selectExpr("id", "id as part") + .repartition(9) + .write + .partitionBy("part") + .format("delta") + .mode("append") + .save(inputDir.getCanonicalPath) + runQuery() + + checkAnswer( + spark.read.format("delta").load(outputDir.getCanonicalPath), + Row(0, 0) :: Nil) + } + } } /**