Skip to content

Commit

Permalink
[Spark] Only enable a single legacy feature with legacy metadata prop…
Browse files Browse the repository at this point in the history
…erties (#3657)

#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Currently enabling legacy features on legacy protocols with metadata
properties results to enabling all preceding legacy features. For
example, enabling enableChangeDataFeed results to protocol (1, 4). This
is inconsistent with the rest of the protocol operations. In this PR, we fix
this inconsistency by always enabling only the requested feature. This
is a behavioral change.

## How was this patch tested?

Existing and new unit tests.

## Does this PR introduce _any_ user-facing changes?

Yes. When enabling a feature using a table property, e.g. by setting
`delta.enableChangeDataFeed` to `true`, then in the previous situation
you would typically get protocol `(1, 4)`. Now you would get `(1, 7,
changeDataFeed)`. The user can get `(1, 4)` by also asking for
`delta.minWriterVersion = 4`. This change is OK now because (a) enabling
fewer features is safer than enabling more features, and (b) Deletion
Vectors requires table features support, and it is very popular to
implement, so many clients have added support table features, (c) users
can easily get back to the legacy protocol by ALTERing the protocol and
asking for `delta.minWriterVersion = 4`.

Signed-off-by: Bart Samwel <bart.samwel@databricks.com>
  • Loading branch information
bart-samwel authored Sep 9, 2024
1 parent ac667ff commit 15da4aa
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ trait DeltaSharingTestSparkUtils extends DeltaSQLTestUtils {

protected def createSimpleTable(tableName: String, enableCdf: Boolean): Unit = {
val tablePropertiesStr = if (enableCdf) {
"TBLPROPERTIES (delta.enableChangeDataFeed = true)"
"""TBLPROPERTIES (
|delta.minReaderVersion=1,
|delta.minWriterVersion=4,
|delta.enableChangeDataFeed = true)""".stripMargin
} else {
""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,12 @@ object Protocol {
*
* This function returns the protocol versions and features individually instead of a
* [[Protocol]], so the caller can identify the features that caused the protocol version. For
* example, if the return values are (2, 5, columnMapping), the caller can safely ignore all
* other features required by the protocol with a reader and writer version of 2 and 5.
* example, if the return values are (2, 5, columnMapping + preceding features), the caller
* can safely ignore all other features required by the protocol with a reader and writer
* version of 2 and 5.
*
* Note that this method does not consider protocol versions and features configured in session
* defaults. To make them effective, copy them to `metadata` using
* [[DeltaConfigs.mergeGlobalConfigs]].
* Note that this method does not consider features configured in session defaults.
* To make them effective, copy them to `metadata` using [[DeltaConfigs.mergeGlobalConfigs]].
*/
def minProtocolComponentsFromMetadata(
spark: SparkSession,
Expand All @@ -343,46 +343,11 @@ object Protocol {
spark, metadata, Protocol().withFeatures(tablePropEnabledFeatures))
val allEnabledFeatures = tablePropEnabledFeatures ++ metaEnabledFeatures

// Determine the min reader and writer version required by features in table properties or
// metadata.
// If any table property is specified:
// we start from (3, 7) or (0, 7) depending on the existence of any writer-only feature.
// If there's no table property:
// if no feature is enabled or all features are legacy, we start from (0, 0);
// if any feature is native and is reader-writer, we start from (3, 7);
// otherwise we start from (0, 7) because there must exist a native writer-only feature.
var (readerVersionFromFeatures, writerVersionFromFeatures) = {
if (tablePropEnabledFeatures.exists(_.isReaderWriterFeature)) {
(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)
} else if (tablePropEnabledFeatures.nonEmpty) {
(0, TABLE_FEATURES_MIN_WRITER_VERSION)
} else if (metaEnabledFeatures.forall(_.isLegacyFeature)) { // also true for empty set
(0, 0)
} else if (metaEnabledFeatures.exists(f => !f.isLegacyFeature && f.isReaderWriterFeature)) {
(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)
} else {
(0, TABLE_FEATURES_MIN_WRITER_VERSION)
}
}
allEnabledFeatures.foreach { feature =>
readerVersionFromFeatures = math.max(readerVersionFromFeatures, feature.minReaderVersion)
writerVersionFromFeatures = math.max(writerVersionFromFeatures, feature.minWriterVersion)
}

// Protocol version provided in table properties can upgrade the protocol, but only when they
// are higher than which required by the enabled features.
val (readerVersionFromTableConfOpt, writerVersionFromTableConfOpt) =
getProtocolVersionsFromTableConf(tableConf)

// Decide the final protocol version:
// a. 1, aka the lowest version possible
// b. version required by manually enabled features and metadata features
// c. version defined as table properties
val finalReaderVersion =
Seq(1, readerVersionFromFeatures, readerVersionFromTableConfOpt.getOrElse(0)).max
val finalWriterVersion =
Seq(1, writerVersionFromFeatures, writerVersionFromTableConfOpt.getOrElse(0)).max

// If the user explicitly sets the table versions, we need to take into account the
// relevant implicit features.
val implicitFeaturesFromTableConf =
Expand All @@ -399,7 +364,14 @@ object Protocol {
case _ => Set.empty
}

(finalReaderVersion, finalWriterVersion, allEnabledFeatures ++ implicitFeaturesFromTableConf)
// Construct the minimum required protocol for the enabled features.
val minProtocol = Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)
.withFeatures(allEnabledFeatures ++ implicitFeaturesFromTableConf)
.normalized

// Return the minimum protocol components.
(minProtocol.minReaderVersion, minProtocol.minWriterVersion,
minProtocol.implicitlyAndExplicitlySupportedFeatures)
}

/**
Expand Down Expand Up @@ -488,32 +460,12 @@ object Protocol {
spark: SparkSession,
metadata: Metadata,
current: Protocol): Option[Protocol] = {
val (readerVersion, writerVersion, minRequiredFeatures) =
minProtocolComponentsFromAutomaticallyEnabledFeatures(spark, metadata, current)

// If the user sets the protocol versions we need to take it account. In general,
// enabling legacy features on legacy protocols results to pumping up the protocol
// versions. However, setting table feature protocol versions while enabling
// legacy features results to only enabling the requested features. For example:
// 1) Create table with (1, 2), then ALTER TABLE with DeltaConfigs.CHANGE_DATA_FEED.key = true
// results to (1, 4).
// 2) Alternatively, Create table with (1, 2), then
// ALTER TABLE set versions (1, 7) and DeltaConfigs.CHANGE_DATA_FEED.key = true results
// to (1, 7, AppendOnly, Invariants, CDF).
val readerVersionFromConf =
Protocol.getReaderVersionFromTableConf(metadata.configuration).getOrElse(readerVersion)
val writerVersionFromConf =
Protocol.getWriterVersionFromTableConf(metadata.configuration).getOrElse(writerVersion)

val finalReaderVersion =
Seq(readerVersion, readerVersionFromConf, current.minReaderVersion).max
val finalWriterVersion =
Seq(writerVersion, writerVersionFromConf, current.minWriterVersion).max

// Increment the reader and writer version to accurately add enabled legacy table features
// either to the implicitly enabled table features or the table feature lists.

val required =
Protocol(finalReaderVersion, finalWriterVersion).withFeatures(minRequiredFeatures)
Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)
.withFeatures(extractAutomaticallyEnabledFeatures(spark, metadata, current))
.normalized

if (!required.canUpgradeTo(current)) {
// When the current protocol does not satisfy metadata requirement, some additional features
// must be supported by the protocol. We assert those features can actually perform the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,16 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil
// We set CDC to be enabled by default, so this should automatically bump the writer protocol
// to the required version.
if (columnMappingEnabled) {
assert(log.snapshot.protocol == Protocol(2, 5))
assert(log.update().protocol == Protocol(2, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
ChangeDataFeedTableFeature,
ColumnMappingTableFeature)))
} else {
assert(log.snapshot.protocol == Protocol(1, 4))
assert(log.update().protocol == Protocol(1, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
ChangeDataFeedTableFeature)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,13 @@ class DeltaColumnMappingSuite extends QueryTest
expectedSchema: StructType,
ignorePhysicalName: Boolean,
mode: String,
createNewTable: Boolean = true)(fn: => Unit): Unit = {
createNewTable: Boolean = true,
tableFeaturesProtocolExpected: Boolean = true)(fn: => Unit): Unit = {
withTable(tableName) {
fn
checkProperties(tableName,
readerVersion = 2,
writerVersion = 5,
writerVersion = if (tableFeaturesProtocolExpected) 7 else 5,
mode = Some(mode),
curMaxId = DeltaColumnMapping.findMaxColumnId(expectedSchema)
)
Expand Down Expand Up @@ -826,7 +827,7 @@ class DeltaColumnMappingSuite extends QueryTest
checkSchema("t1", schemaWithId)
checkProperties("t1",
readerVersion = 2,
writerVersion = 5,
writerVersion = 7,
mode = Some(mode),
curMaxId = DeltaColumnMapping.findMaxColumnId(schemaWithId)
)
Expand All @@ -849,7 +850,7 @@ class DeltaColumnMappingSuite extends QueryTest

checkProperties("t1",
readerVersion = 2,
writerVersion = 5,
writerVersion = 7,
mode = Some(mode),
curMaxId = DeltaColumnMapping.findMaxColumnId(schemaWithIdNested))
checkSchema(
Expand All @@ -871,7 +872,7 @@ class DeltaColumnMappingSuite extends QueryTest

checkProperties("t1",
readerVersion = 2,
writerVersion = 5,
writerVersion = 7,
mode = Some(mode),
curMaxId = curMaxId)

Expand All @@ -886,7 +887,7 @@ class DeltaColumnMappingSuite extends QueryTest
)
checkProperties("t1",
readerVersion = 2,
writerVersion = 5,
writerVersion = 7,
mode = Some(mode),
curMaxId = curMaxId2)
checkSchema("t1",
Expand Down Expand Up @@ -938,7 +939,7 @@ class DeltaColumnMappingSuite extends QueryTest

checkProperties("t1",
readerVersion = 2,
writerVersion = 5,
writerVersion = 7,
mode = Some(mode),
curMaxId = curMaxId)
checkSchema("t1",
Expand All @@ -960,7 +961,7 @@ class DeltaColumnMappingSuite extends QueryTest
)
checkProperties("t1",
readerVersion = 2,
writerVersion = 5,
writerVersion = 7,
mode = Some(mode),
curMaxId = curMaxId2)
checkSchema("t1",
Expand Down Expand Up @@ -998,7 +999,7 @@ class DeltaColumnMappingSuite extends QueryTest

checkProperties("t1",
readerVersion = 2,
writerVersion = 5,
writerVersion = 7,
mode = Some(mode),
curMaxId = curMaxId)
checkSchema("t1", schemaWithId)
Expand All @@ -1013,7 +1014,7 @@ class DeltaColumnMappingSuite extends QueryTest

checkProperties("t1",
readerVersion = 2,
writerVersion = 5,
writerVersion = 7,
mode = Some(mode),
curMaxId = curMaxId)

Expand All @@ -1037,7 +1038,7 @@ class DeltaColumnMappingSuite extends QueryTest
val curMaxId2 = DeltaColumnMapping.findMaxColumnId(schemaWithId) + 1
checkProperties("t1",
readerVersion = 2,
writerVersion = 5,
writerVersion = 7,
mode = Some(mode),
curMaxId = curMaxId2)
checkSchema("t1", schemaWithId.add("c", StringType, true, withId(3)))
Expand Down Expand Up @@ -1627,7 +1628,8 @@ class DeltaColumnMappingSuite extends QueryTest
schemaWithDottedColumnNames,
false,
"name",
createNewTable = false
createNewTable = false,
tableFeaturesProtocolExpected = false
) {
sql(s"CREATE TABLE t1 (${schemaWithDottedColumnNames.toDDL}) USING DELTA")
alterTableWithProps("t1", props = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,25 @@ trait DeltaDDLUsingPathTests extends QueryTest
"key" -> "value")
}

val protocol = Protocol.forNewTable(spark, Some(metadata))
val supportedFeatures = protocol
.readerAndWriterFeatureNames
.map(name => s"delta.feature.$name" -> "supported")
val expectedProperties = Seq(
"delta.logRetentionDuration" -> "2 weeks",
"delta.minReaderVersion" -> protocol.minReaderVersion.toString,
"delta.minWriterVersion" -> protocol.minWriterVersion.toString,
"key" -> "value") ++ supportedFeatures

checkDatasetUnorderly(
dropColumnMappingConfigurations(
sql(s"SHOW TBLPROPERTIES $table").as[(String, String)]),
"delta.logRetentionDuration" -> "2 weeks",
"delta.minReaderVersion" ->
Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString,
"delta.minWriterVersion" ->
Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString,
"key" -> "value")
expectedProperties: _*)

checkDatasetUnorderly(
dropColumnMappingConfigurations(
sql(s"SHOW TBLPROPERTIES delta.`$path`").as[(String, String)]),
"delta.logRetentionDuration" -> "2 weeks",
"delta.minReaderVersion" ->
Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString,
"delta.minWriterVersion" ->
Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString,
"key" -> "value")
expectedProperties: _*)

if (table == "`delta_test`") {
val tableName = s"$catalogName.default.delta_test"
Expand Down
Loading

0 comments on commit 15da4aa

Please # to comment.