diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index b8dc06673e0..916bdab9b2b 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -628,6 +628,24 @@ trait DeltaSQLConfBase { .longConf .createWithDefault(128L * 1024 * 1024) // 128MB + val STREAMING_OFFSET_VALIDATION = + buildConf("streaming.offsetValidation.enabled") + .internal() + .doc("Whether to validate whether delta streaming source generates a smaller offset and " + + "moves backward.") + .booleanConf + .createWithDefault(true) + + val STREAMING_AVAILABLE_NOW_OFFSET_INITIALIZATION_FIX = + buildConf("streaming.availableNow.offsetInitializationFix.enabled") + .internal() + .doc( + """Whether to enable the offset initializaion fix for AvailableNow. + |This is just a flag to provide the mitigation option if the fix introduces + |any bugs.""".stripMargin) + .booleanConf + .createWithDefault(true) + val LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS = buildConf("loadFileSystemConfigsFromDataFrameOptions") .internal() diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 51d955d040a..5ce7b737bf8 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -134,15 +134,48 @@ trait DeltaSourceBase extends Source } } - protected var lastOffsetForTriggerAvailableNow: DeltaSourceOffset = _ + private val enableAvailableNowOffsetInitializationFix = + spark.sessionState.conf.getConf(DeltaSQLConf.STREAMING_AVAILABLE_NOW_OFFSET_INITIALIZATION_FIX) + + /** + * When `AvailableNow` is used, this offset will be the upper bound where this run of the query + * will process up. We may run multiple micro batches, but the query will stop itself when it + * reaches this offset. + */ + protected var lastOffsetForTriggerAvailableNow: DeltaSourceOffset = null + + private var isLastOffsetForTriggerAvailableNowInitialized = false + + private var isTriggerAvailableNow = false override def prepareForTriggerAvailableNow(): Unit = { - val offset = latestOffset(null, ReadLimit.allAvailable()) - if (offset != null) { - lastOffsetForTriggerAvailableNow = DeltaSourceOffset(tableId, offset) + isTriggerAvailableNow = true + if (!enableAvailableNowOffsetInitializationFix) { + val offset = latestOffsetInternal(ReadLimit.allAvailable()) + if (offset != null) { + lastOffsetForTriggerAvailableNow = DeltaSourceOffset(tableId, offset) + } + } + } + + /** + * initialize the internal states for AvailableNow if this method is called first time after + * `prepareForTriggerAvailableNow`. + */ + protected def initForTriggerAvailableNowIfNeeded(): Unit = { + if (enableAvailableNowOffsetInitializationFix && isTriggerAvailableNow && + !isLastOffsetForTriggerAvailableNowInitialized) { + isLastOffsetForTriggerAvailableNowInitialized = true + val offset = latestOffsetInternal(ReadLimit.allAvailable()) + if (offset != null) { + lastOffsetForTriggerAvailableNow = DeltaSourceOffset(tableId, offset) + } } } + /** An internal `latestOffsetInternal` to get the latest offset. */ + protected def latestOffsetInternal(limit: ReadLimit): streaming.Offset + protected def getFileChangesWithRateLimit( fromVersion: Long, fromIndex: Long, @@ -420,6 +453,9 @@ case class DeltaSource( extends DeltaSourceBase with DeltaSourceCDCSupport { + private val shouldValidateOffsets = + spark.sessionState.conf.getConf(DeltaSQLConf.STREAMING_OFFSET_VALIDATION) + // Deprecated. Please use `ignoreDeletes` or `ignoreChanges` from now on. private val ignoreFileDeletion = { if (options.ignoreFileDeletion) { @@ -587,7 +623,16 @@ case class DeltaSource( new AdmissionLimits().toReadLimit } + /** + * This should only be called by the engine. Call `latestOffsetInternal` instead if you need to + * get the latest offset. + */ override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = { + initForTriggerAvailableNowIfNeeded() + latestOffsetInternal(limit) + } + + override protected def latestOffsetInternal(limit: ReadLimit): streaming.Offset = { val limits = AdmissionLimits(limit) val currentOffset = if (previousOffset == null) { @@ -596,6 +641,11 @@ case class DeltaSource( getNextOffsetFromPreviousOffset(previousOffset, limits) } logDebug(s"previousOffset -> currentOffset: $previousOffset -> $currentOffset") + if (shouldValidateOffsets && previousOffset != null) { + currentOffset.foreach { current => + DeltaSourceOffset.validateOffsets(previousOffset, DeltaSourceOffset(tableId, current)) + } + } currentOffset.orNull } @@ -717,6 +767,10 @@ case class DeltaSource( override def getBatch(startOffsetOption: Option[Offset], end: Offset): DataFrame = { val endOffset = DeltaSourceOffset(tableId, end) previousOffset = endOffset // For recovery + // We need to initialize after `previousOffset` is set so that we can use `previousOffset` to + // know whether we are going to process files in a snapshot or a commit after restart. Even for + // the same table table, the indexes of a file in a snpshot and a commit are different. + initForTriggerAvailableNowIfNeeded() val (startVersion, startIndex, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala index 0bb7a95085f..67bd6a4bb2c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala @@ -124,4 +124,27 @@ object DeltaSourceOffset { case value: JValue => Some(value) } } + + /** + * Validate offsets to make sure we always move forward. Moving backward may make the query + * re-process data and cause data duplication. + */ + def validateOffsets(previousOffset: DeltaSourceOffset, currentOffset: DeltaSourceOffset): Unit = { + if (previousOffset.isStartingVersion == false && currentOffset.isStartingVersion == true) { + throw new IllegalStateException( + s"Found invalid offsets: 'isStartingVersion' fliped incorrectly. " + + s"Previous: $previousOffset, Current: $currentOffset") + } + if (previousOffset.reservoirVersion > currentOffset.reservoirVersion) { + throw new IllegalStateException( + s"Found invalid offsets: 'reservoirVersion' moved back. " + + s"Previous: $previousOffset, Current: $currentOffset") + } + if (previousOffset.reservoirVersion == currentOffset.reservoirVersion && + previousOffset.index > currentOffset.index) { + throw new IllegalStateException( + s"Found invalid offsets. 'index' moved back. " + + s"Previous: $previousOffset, Current: $currentOffset") + } + } } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index 928baff0154..a02b106c339 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta import java.io.{File, FileInputStream, OutputStream} import java.net.URI import java.util.UUID +import java.util.concurrent.TimeoutException import scala.concurrent.duration._ import scala.language.implicitConversions @@ -1881,6 +1882,135 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase } } } + + test("DeltaSourceOffset.validateOffsets") { + DeltaSourceOffset.validateOffsets( + previousOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false), + currentOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false) + ) + DeltaSourceOffset.validateOffsets( + previousOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false), + currentOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 5, + index = 1, + isStartingVersion = false) + ) + + assert(intercept[IllegalStateException] { + DeltaSourceOffset.validateOffsets( + previousOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false), + currentOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = true) + ) + }.getMessage.contains("Found invalid offsets: 'isStartingVersion' fliped incorrectly.")) + assert(intercept[IllegalStateException] { + DeltaSourceOffset.validateOffsets( + previousOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false), + currentOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 1, + index = 10, + isStartingVersion = false) + ) + }.getMessage.contains("Found invalid offsets: 'reservoirVersion' moved back.")) + assert(intercept[IllegalStateException] { + DeltaSourceOffset.validateOffsets( + previousOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 10, + isStartingVersion = false), + currentOffset = DeltaSourceOffset( + sourceVersion = 1, + reservoirId = "foo", + reservoirVersion = 4, + index = 9, + isStartingVersion = false) + ) + }.getMessage.contains("Found invalid offsets. 'index' moved back.")) + } + + test("ES-445863: delta source should not hang or reprocess data when using AvailableNow") { + withTempDirs { (inputDir, outputDir, checkpointDir) => + def runQuery(): Unit = { + val q = spark.readStream + .format("delta") + .load(inputDir.getCanonicalPath) + // Require a partition filter. The max index of files matching the partition filter must + // be less than the number of files in the second commit. + .where("part = 0") + .writeStream + .format("delta") + .trigger(Trigger.AvailableNow) + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + try { + if (!q.awaitTermination(60000)) { + throw new TimeoutException("the query didn't stop in 60 seconds") + } + } finally { + q.stop() + } + } + + spark.range(0, 1) + .selectExpr("id", "id as part") + .repartition(10) + .write + .partitionBy("part") + .format("delta") + .mode("append") + .save(inputDir.getCanonicalPath) + runQuery() + + spark.range(1, 10) + .selectExpr("id", "id as part") + .repartition(9) + .write + .partitionBy("part") + .format("delta") + .mode("append") + .save(inputDir.getCanonicalPath) + runQuery() + + checkAnswer( + spark.read.format("delta").load(outputDir.getCanonicalPath), + Row(0, 0) :: Nil) + } + } } /**