diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/CdcAddFileIndex.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/CdcAddFileIndex.scala index 2cdea19d4bf..bfc443a2099 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/files/CdcAddFileIndex.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/CdcAddFileIndex.scala @@ -39,6 +39,9 @@ import org.apache.spark.sql.types.StructType * @param path The table's data path. * @param snapshot The snapshot where we read CDC from. * @param rowIndexFilters Map from URI-encoded file path to a row index filter type. + * + * Note: Please also consider other CDC-related file indexes like [[TahoeChangeFileIndex]] + * and [[TahoeRemoveFileIndex]] when modifying this file index. */ class CdcAddFileIndex( spark: SparkSession, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeChangeFileIndex.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeChangeFileIndex.scala index 044e5ce5023..e7073849860 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeChangeFileIndex.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeChangeFileIndex.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.files +import java.text.SimpleDateFormat + import org.apache.spark.sql.delta.{DeltaLog, Snapshot, SnapshotDescriptor} import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile} import org.apache.spark.sql.delta.commands.cdc.CDCReader.{CDC_COMMIT_TIMESTAMP, CDC_COMMIT_VERSION, CDCDataSpec} @@ -29,6 +31,9 @@ import org.apache.spark.sql.types.{LongType, StructType, TimestampType} /** * A [[TahoeFileIndex]] for scanning a sequence of CDC files. Similar to [[TahoeBatchFileIndex]], * the equivalent for reading [[AddFile]] actions. + * + * Note: Please also consider other CDC-related file indexes like [[CdcAddFileIndex]] + * and [[TahoeRemoveFileIndex]] when modifying this file index. */ class TahoeChangeFileIndex( spark: SparkSession, @@ -47,9 +52,11 @@ class TahoeChangeFileIndex( files.map { f => // We add the metadata as faked partition columns in order to attach it on a per-file // basis. + val tsOpt = Option(ts) + .map(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS Z").format(_)).orNull val newPartitionVals = f.partitionValues + (CDC_COMMIT_VERSION -> version.toString) + - (CDC_COMMIT_TIMESTAMP -> Option(ts).map(_.toString).orNull) + (CDC_COMMIT_TIMESTAMP -> tsOpt) AddFile(f.path, newPartitionVals, f.size, 0, dataChange = false, tags = f.tags) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeRemoveFileIndex.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeRemoveFileIndex.scala index a93e24915ff..aa102c0c8b3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeRemoveFileIndex.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeRemoveFileIndex.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.files +import java.text.SimpleDateFormat + import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile} import org.apache.spark.sql.delta.commands.cdc.CDCReader @@ -36,6 +38,9 @@ import org.apache.spark.sql.types.StructType * @param path The table's data path. * @param snapshot The snapshot where we read CDC from. * @param rowIndexFilters Map from URI-encoded file path to a row index filter type. + * + * Note: Please also consider other CDC-related file indexes like [[CdcAddFileIndex]] + * and [[TahoeChangeFileIndex]] when modifying this file index. */ class TahoeRemoveFileIndex( spark: SparkSession, @@ -62,9 +67,11 @@ class TahoeRemoveFileIndex( } // We add the metadata as faked partition columns in order to attach it on a per-file // basis. + val tsOpt = Option(ts) + .map(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS Z").format(_)).orNull val newPartitionVals = r.partitionValues + (CDC_COMMIT_VERSION -> version.toString) + - (CDC_COMMIT_TIMESTAMP -> Option(ts).map(_.toString).orNull) + + (CDC_COMMIT_TIMESTAMP -> tsOpt) + (CDC_TYPE_COLUMN_NAME -> CDC_TYPE_DELETE_STRING) AddFile( path = r.path, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala index 96622be0a84..f8bde8d239f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.{AnalysisException, DataFrame} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.LongType @@ -218,43 +219,46 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil test("resolve expression for timestamp function") { val tbl = "tbl" - withTable(tbl) { - createTblWithThreeVersions(tblName = Some(tbl)) - - val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) - - val currentTime = new Date().getTime - modifyDeltaTimestamp(deltaLog, 0, currentTime - 100000) - modifyDeltaTimestamp(deltaLog, 1, currentTime) - modifyDeltaTimestamp(deltaLog, 2, currentTime + 100000) - - val readDf = sql(s"SELECT * FROM table_changes('$tbl', 0, now())") - checkCDCAnswer( - DeltaLog.forTable(spark, TableIdentifier("tbl")), - readDf, - spark.range(20) - .withColumn("_change_type", lit("insert")) - .withColumn("_commit_version", (col("id") / 10).cast(LongType)) + withDefaultTimeZone(UTC) { + withTable(tbl) { + createTblWithThreeVersions(tblName = Some(tbl)) + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) + + val currentTime = new Date().getTime + modifyDeltaTimestamp(deltaLog, 0, currentTime - 100000) + modifyDeltaTimestamp(deltaLog, 1, currentTime) + modifyDeltaTimestamp(deltaLog, 2, currentTime + 100000) + + val readDf = sql(s"SELECT * FROM table_changes('$tbl', 0, now())") + checkCDCAnswer( + DeltaLog.forTable(spark, TableIdentifier("tbl")), + readDf, + spark.range(20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType)) ) - // more complex expression - val readDf2 = sql(s"SELECT * FROM table_changes('$tbl', 0, now() + interval 5 seconds)") - checkCDCAnswer( - DeltaLog.forTable(spark, TableIdentifier("tbl")), - readDf2, - spark.range(20) - .withColumn("_change_type", lit("insert")) - .withColumn("_commit_version", (col("id") / 10).cast(LongType)) - ) - val readDf3 = sql("SELECT * FROM table_changes" + - s"('$tbl', string(date_sub(current_date(), 1)), string(now()))") - checkCDCAnswer( - DeltaLog.forTable(spark, TableIdentifier("tbl")), - readDf2, - spark.range(20) - .withColumn("_change_type", lit("insert")) - .withColumn("_commit_version", (col("id") / 10).cast(LongType)) - ) + // more complex expression + val readDf2 = sql(s"SELECT * FROM table_changes('$tbl', 0, now() + interval 5 seconds)") + checkCDCAnswer( + DeltaLog.forTable(spark, TableIdentifier("tbl")), + readDf2, + spark.range(20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType)) + ) + + val readDf3 = sql("SELECT * FROM table_changes" + + s"('$tbl', string(date_sub(current_date(), 1)), string(now()))") + checkCDCAnswer( + DeltaLog.forTable(spark, TableIdentifier("tbl")), + readDf3, + spark.range(20) + .withColumn("_change_type", lit("insert")) + .withColumn("_commit_version", (col("id") / 10).cast(LongType)) + ) + } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala index c4a6814e10f..e9f2b7886a6 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala @@ -34,7 +34,8 @@ import org.apache.spark.sql.delta.util.FileNames import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.functions.{col, current_timestamp, floor, lit} +import org.apache.spark.sql.functions.{col, current_timestamp, floor, lit, unix_timestamp} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQueryException import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{LongType, StringType, StructType} @@ -62,7 +63,7 @@ abstract class DeltaCDCSuiteBase case class EndingVersion(value: String) extends Boundary case class EndingTimestamp(value: String) extends Boundary case object Unbounded extends Boundary // used to model situation when a boundary isn't provided - val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z") def createTblWithThreeVersions( tblName: Option[String] = None, @@ -250,13 +251,11 @@ abstract class DeltaCDCSuiteBase // modify timestamps // version 0 modifyDeltaTimestamp(deltaLog, 0, 0) - val tsAfterV0 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(new Date(1)) + val tsAfterV0 = dateFormat.format(new Date(1)) // version 1 modifyDeltaTimestamp(deltaLog, 1, 1000) - val tsAfterV1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(new Date(1001)) + val tsAfterV1 = dateFormat.format(new Date(1001)) modifyDeltaTimestamp(deltaLog, 2, 2000) @@ -447,8 +446,7 @@ abstract class DeltaCDCSuiteBase // Set commit time during Daylight savings time change. val restoreDate = "2022-11-06 01:42:44" - val format = new java.text.SimpleDateFormat("yyyy-MM-dd hh:mm:ss Z") - val timestamp = format.parse(s"$restoreDate -0800").getTime + val timestamp = dateFormat.parse(s"$restoreDate -0800").getTime modifyDeltaTimestamp(deltaLog, 0, timestamp) // Verify DST is respected. @@ -477,6 +475,66 @@ abstract class DeltaCDCSuiteBase } } + test("CDC read's commit timestamps are correct under different timezones") { + val tblName = "tbl" + withTable(tblName) { + spark.sql(s"CREATE OR REPLACE TABLE $tblName(id INT, name STRING, age INT) " + + s"USING DELTA TBLPROPERTIES (delta.enableChangeDataFeed = true)") + spark.sql(s"INSERT INTO $tblName(id, name, age) VALUES (1,'abc',20)") + spark.sql(s"INSERT INTO $tblName(id, name, age) VALUES (2,'def',21)") + spark.sql(s"UPDATE $tblName SET age = 19 WHERE id = 1") + spark.sql(s"INSERT INTO $tblName(id, name, age) VALUES (3,'ghi',15)") + spark.sql(s"DELETE FROM $tblName WHERE id = 3") + + // unix_timestamp() on a Timestamp column returns the UNIX timestamp of the specified + // time under the given SESSION_LOCAL_TIMEZONE, while collect() on a timestamp column + // always returns the Timestamp in UTC. + // By using unix_timestamp() on the commit timestamp column, we can accurately determine + // whether or not the timestamp under different timezones represent the same point in time. + val startingVersion = StartingVersion("0") + val endingVersion = EndingVersion("10") + spark.conf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, "America/Chicago") + val readDfChicago = cdcRead(new TableName(tblName), startingVersion, endingVersion) + .orderBy(CDC_COMMIT_VERSION, CDC_TYPE_COLUMN_NAME) + .select(col(CDC_COMMIT_VERSION), col(CDC_TYPE_COLUMN_NAME), + unix_timestamp(col(CDC_COMMIT_TIMESTAMP))) + val readDfChicagoRows = readDfChicago.collect() + + spark.conf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, "Asia/Ho_Chi_Minh") + val readDfHCM = cdcRead(new TableName(tblName), startingVersion, endingVersion) + .orderBy(CDC_COMMIT_VERSION, CDC_TYPE_COLUMN_NAME) + .select(col(CDC_COMMIT_VERSION), col(CDC_TYPE_COLUMN_NAME), + unix_timestamp(col(CDC_COMMIT_TIMESTAMP))) + val readDfHCMRows = readDfHCM.collect() + + spark.conf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, "UTC") + val readDfUTC = cdcRead(new TableName(tblName), startingVersion, endingVersion) + .orderBy(CDC_COMMIT_VERSION, CDC_TYPE_COLUMN_NAME) + .select(col(CDC_COMMIT_VERSION), col(CDC_TYPE_COLUMN_NAME), + unix_timestamp(col(CDC_COMMIT_TIMESTAMP))) + val readDfUTCRows = readDfUTC.collect() + + def checkCDCTimestampEqual(firstRows: Array[Row], secondRows: Array[Row]): Boolean = { + assert(firstRows.length === secondRows.length, + "Number of rows from 2 DFs should be the same.") + for ((firstRow, secondRow) <- firstRows.zip(secondRows)) { + assert(firstRow.getLong(0) === secondRow.getLong(0), + "Commit version should be the same for every rows.") + assert(firstRow.getString(1) === secondRow.getString(1), + "Change type should be the same for every rows.") + if (firstRow.getLong(2) != secondRow.getLong(2)) { + return false + } + } + true + } + + assert(checkCDCTimestampEqual(readDfChicagoRows, readDfHCMRows) === true) + assert(checkCDCTimestampEqual(readDfChicagoRows, readDfUTCRows) === true) + assert(checkCDCTimestampEqual(readDfHCMRows, readDfUTCRows) === true) + } + } + test("start version is provided and no end version") { val tblName = "tbl" withTable(tblName) { @@ -742,11 +800,8 @@ abstract class DeltaCDCSuiteBase // version 2 modifyDeltaTimestamp(deltaLog, 2, 2000) - - val tsStart = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(new Date(3000)) - val tsEnd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(new Date(4000)) + val tsStart = dateFormat.format(new Date(3000)) + val tsEnd = dateFormat.format(new Date(4000)) val readDf = cdcRead( new TablePath(path), @@ -776,10 +831,8 @@ abstract class DeltaCDCSuiteBase // version 2 modifyDeltaTimestamp(deltaLog, 2, 2000) - val tsStart = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(new Date(0)) - val tsEnd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(new Date(4000)) + val tsStart = dateFormat.format(new Date(0)) + val tsEnd = dateFormat.format(new Date(4000)) val readDf = cdcRead( new TablePath(tempDir.getAbsolutePath),