Skip to content

Commit

Permalink
[Spark] CDC reader accept case insensitive Boolean option values (#3584)
Browse files Browse the repository at this point in the history
## Description

`DeltaOptions` are equipped to accept case-insensitive values of boolean
flags, but CDCReader was not, resulting in not-accepting 'True'. Make it
case insensitive.

A separate bug in Spark Connect was causing "True" to be passed from
Python boolean True. That is being fixed by
apache/spark#47790

## How was this patch tested?

Tests added.

## Does this PR introduce _any_ user-facing changes?

Datasource option to enable CDC should now accept "True" and other
mixed-case variants and not only "true".

---------

Co-authored-by: Julek Sompolski <Juliusz Sompolski>
  • Loading branch information
juliuszsompolski authored Aug 21, 2024
1 parent 01bf607 commit cfb0292
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
35 changes: 35 additions & 0 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,41 @@ def test_history(self) -> None:
[Row("Overwrite")],
StructType([StructField("operationParameters.mode", StringType(), True)]))

def test_cdc(self):
self.spark.range(0, 5).write.format("delta").save(self.tempFile)
deltaTable = DeltaTable.forPath(self.spark, self.tempFile)
# Enable Change Data Feed
self.spark.sql(
"ALTER TABLE delta.`{}` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)"
.format(self.tempFile))

# Perform some operations
deltaTable.update("id = 1", {"id": "10"})
deltaTable.delete("id = 2")
self.spark.range(5, 10).write.format("delta").mode("append").save(self.tempFile)

# Check the Change Data Feed
expected = [
(1, "update_preimage"),
(10, "update_postimage"),
(2, "delete"),
(5, "insert"),
(6, "insert"),
(7, "insert"),
(8, "insert"),
(9, "insert")
]
# Read Change Data Feed
# (Test handling of the option as boolean and string and with different cases)
for option in [True, "true", "tRuE"]:
cdf = self.spark.read.format("delta") \
.option("readChangeData", option) \
.option("startingVersion", "1") \
.load(self.tempFile)

result = [(row.id, row._change_type) for row in cdf.collect()]
self.assertEqual(sorted(result), sorted(expected))

def test_detail(self) -> None:
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3)])
dt = DeltaTable.forPath(self.spark, self.tempFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.commands.cdc
import java.sql.Timestamp

import scala.collection.mutable.{ListBuffer, Map => MutableMap}
import scala.util.Try

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions._
Expand Down Expand Up @@ -973,11 +974,20 @@ trait CDCReaderImpl extends DeltaLogging {
* Based on the read options passed it indicates whether the read was a cdc read or not.
*/
def isCDCRead(options: CaseInsensitiveStringMap): Boolean = {
// Consistent with DeltaOptions.readChangeFeed,
// but CDCReader use CaseInsensitiveStringMap vs. CaseInsensitiveMap used by DataFrameReader.
def toBoolean(input: String, name: String): Boolean = {
Try(input.toBoolean).toOption.getOrElse {
throw DeltaErrors.illegalDeltaOptionException(name, input, "must be 'true' or 'false'")
}
}

val cdcEnabled = options.containsKey(DeltaDataSource.CDC_ENABLED_KEY) &&
options.get(DeltaDataSource.CDC_ENABLED_KEY) == "true"
toBoolean(options.get(DeltaDataSource.CDC_ENABLED_KEY), DeltaDataSource.CDC_ENABLED_KEY)

val cdcLegacyConfEnabled = options.containsKey(DeltaDataSource.CDC_ENABLED_KEY_LEGACY) &&
options.get(DeltaDataSource.CDC_ENABLED_KEY_LEGACY) == "true"
toBoolean(
options.get(DeltaDataSource.CDC_ENABLED_KEY_LEGACY), DeltaDataSource.CDC_ENABLED_KEY_LEGACY)

cdcEnabled || cdcLegacyConfEnabled
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,35 @@ class DeltaCDCScalaSuite extends DeltaCDCSuiteBase {
}
}
}

test("reader should accept case insensitive option") {
val tblName = "tbl"
withTable(tblName) {
createTblWithThreeVersions(tblName = Some(tblName))
val res = spark.read.format("delta")
.option("ReadChangeFEED", "tRuE")
.option("STARTINGVERSION", 0)
.option("endingVersion", 1)
.table(tblName)
.select("id", "_change_type")
assert(res.columns === Seq("id", "_change_type"))
checkAnswer(
res,
spark.range(20).withColumn("_change_type", lit("insert")))

val resLegacy = spark.read.format("delta")
.option("READCHANGEDATA", "TruE")
.option("startingversion", 0)
.option("ENDINGVERSION", 1)
.table(tblName)
.select("id", "_change_type")
assert(resLegacy.columns === Seq("id", "_change_type"))
checkAnswer(
resLegacy,
spark.range(20).withColumn("_change_type", lit("insert")))
}
}

}

class DeltaCDCScalaWithDeletionVectorsSuite extends DeltaCDCScalaSuite
Expand Down

0 comments on commit cfb0292

Please # to comment.