From 4ee7f4d669558b3d19785152298c899f70d7b055 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou <93710326+andreaschat-db@users.noreply.github.com> Date: Mon, 13 May 2024 18:29:07 +0200 Subject: [PATCH] [Spark] Protocol version downgrade in the presence of table features (#2841) ## Description This PR adds support for protocol versions downgrade when table features exist in the protocol. The downgraded protocol versions should be the minimum required to support all available table features. For example, `Protocol(3, 7, DeletionVectors, RowTracking)` can be downgraded to `Protocol(1, 7, RowTracking)` after removing the DV feature. ## How was this patch tested? Added new UTs in DeltaProtocolVersionSuite. Furthermore, existing UTs cover a significant part of the functionality. These these are the following: - Downgrade protocol version on table created with (3, 7). - Downgrade protocol version on table created with (1, 7). - Protocol version downgrade on a table with table features and added legacy feature. - Protocol version is not downgraded when writer features exist. - Protocol version is not downgraded when reader+writer features exist. - Protocol version is not downgraded when multiple reader+writer features exist. ## Does this PR introduce _any_ user-facing changes? Yes. Dropping a table feature from a table with multiple features may now result to a Protocol versions downgrade. For example, `Protocol(3, 7, DeletionVectors, RowTracking)` can now be downgraded to `Protocol(1, 7, RowTracking)`. --- .../delta/actions/TableFeatureSupport.scala | 67 ++++++++---------- .../sql/delta/DeltaProtocolVersionSuite.scala | 69 +++++++++++++++---- .../sql/delta/DeltaTableFeatureSuite.scala | 27 ++------ 3 files changed, 88 insertions(+), 75 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index c9c84547aae..8ffd383b498 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -188,6 +188,14 @@ trait TableFeatureSupport { this: Protocol => lazy val readerAndWriterFeatures: Seq[TableFeature] = readerAndWriterFeatureNames.toSeq.flatMap(TableFeature.featureNameToFeature) + /** + * A sequence of native [[TableFeature]]s. This is derived by filtering out all explicitly + * supported legacy features. + */ + @JsonIgnore + lazy val nativeReaderAndWriterFeatures: Seq[TableFeature] = + readerAndWriterFeatures.filterNot(_.isLegacyFeature) + /** * Get all features that are implicitly supported by this protocol, for example, `Protocol(1,2)` * implicitly supports `appendOnly` and `invariants`. When this protocol is capable of requiring @@ -242,43 +250,16 @@ trait TableFeatureSupport { this: Protocol => } /** - * Determine whether this protocol can be safely downgraded to a new protocol `to`. This - * includes the following: - * - The current protocol needs to support at least writer features. This is because protocol - * downgrade is only supported with table features. - * - The protocol version can only be downgraded when there are no non-legacy table features. - * - We can only remove one feature at a time. - * - When downgrading protocol versions, the resulting versions must support exactly the same - * set of legacy features supported by the current protocol. - * - * Note, this not an exhaustive list of downgrade rules. Rather, we check the most important - * downgrade invariants. We also perform checks during feature removal at - * [[AlterTableDropFeatureDeltaCommand]]. + * Determine whether this protocol can be safely downgraded to a new protocol `to`. + * All we need is the implicit and explicit features between the two protocols to match, + * excluding the dropped feature. Note, this accounts for cases where we downgrade + * from table features to legacy protocol versions. */ def canDowngradeTo(to: Protocol, droppedFeatureName: String): Boolean = { - if (!supportsWriterFeatures) return false - - // When `to` protocol does not have any features version downgrades are possible. However, - // the current protocol needs to contain one non-legacy feature. We also allow downgrade when - // there are only legacy features. This is to accommodate the case when the user attempts to - // remove a legacy feature in a table that only contains legacy features. - if (to.readerAndWriterFeatureNames.isEmpty) { - val featureNames = readerAndWriterFeatureNames - droppedFeatureName - val sameLegacyFeaturesSupported = featureNames == to.implicitlySupportedFeatures.map(_.name) - val minRequiredVersions = TableFeatureProtocolUtils.minimumRequiredVersions( - featureNames.flatMap(TableFeature.featureNameToFeature).toSeq) - - return sameLegacyFeaturesSupported && - (to.minReaderVersion, to.minWriterVersion) == minRequiredVersions && - readerAndWriterFeatures.filterNot(_.isLegacyFeature).size <= 1 - } - - // When `to` protocol contains table features we cannot downgrade the protocol version. - if (to.minReaderVersion != this.minReaderVersion) return false - if (to.minWriterVersion != this.minWriterVersion) return false - - // Can only remove a maximum of one feature at a time. - (this.readerAndWriterFeatureNames -- to.readerAndWriterFeatureNames).size == 1 + val thisFeatures = this.implicitlyAndExplicitlySupportedFeatures + val toFeatures = to.implicitlyAndExplicitlySupportedFeatures + val droppedFeature = Seq(droppedFeatureName).flatMap(TableFeature.featureNameToFeature) + (thisFeatures -- droppedFeature) == toFeatures } /** @@ -368,13 +349,25 @@ trait TableFeatureSupport { this: Protocol => * features. After we remove the last native feature we downgrade the protocol to (1, 1). */ def downgradeProtocolVersionsIfNeeded: Protocol = { - if (!readerAndWriterFeatures.forall(_.isLegacyFeature)) return this + if (nativeReaderAndWriterFeatures.nonEmpty) { + val (minReaderVersion, minWriterVersion) = + TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures) + // It is guaranteed by the definitions of WriterFeature and ReaderFeature, that we cannot + // end up with invalid protocol versions such as (3, 3). Nevertheless, + // we double check it here. + val newProtocol = + Protocol(minReaderVersion, minWriterVersion).withFeatures(readerAndWriterFeatures) + assert( + newProtocol.supportsWriterFeatures, + s"Downgraded protocol should at least support writer features, but got $newProtocol.") + return newProtocol + } val (minReaderVersion, minWriterVersion) = TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures) val newProtocol = Protocol(minReaderVersion, minWriterVersion) - require( + assert( !newProtocol.supportsReaderFeatures && !newProtocol.supportsWriterFeatures, s"Downgraded protocol should not support table features, but got $newProtocol.") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index 0e7bed1aa75..eb53711f023 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -3276,7 +3276,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest initialMinWriterVersion: Int, featuresToAdd: Seq[TableFeature], featuresToRemove: Seq[TableFeature], - expectedDowngradedProtocol: Protocol): Unit = { + expectedDowngradedProtocol: Protocol, + truncateHistory: Boolean = false): Unit = { withTempDir { dir => val deltaLog = DeltaLog.forTable(spark, dir) @@ -3297,8 +3298,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest |)""".stripMargin) for (feature <- featuresToRemove) { - AlterTableDropFeatureDeltaCommand(DeltaTableV2(spark, deltaLog.dataPath), feature.name) - .run(spark) + AlterTableDropFeatureDeltaCommand( + table = DeltaTableV2(spark, deltaLog.dataPath), + featureName = feature.name, + truncateHistory = truncateHistory).run(spark) } assert(deltaLog.update().protocol === expectedDowngradedProtocol) } @@ -3345,7 +3348,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest expectedDowngradedProtocol = Protocol(1, 1)) } - test("Downgrade protocol version on table created with table features") { + test("Downgrade protocol version on table created with (3, 7)") { // When the table is initialized with table features there are no active (implicit) legacy // features. After removing the last table feature we downgrade back to (1, 1). testProtocolVersionDowngrade( @@ -3356,7 +3359,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest expectedDowngradedProtocol = Protocol(1, 1)) } - test("Downgrade protocol version on table created with writer features") { + test("Downgrade protocol version on table created with (1, 7)") { testProtocolVersionDowngrade( initialMinReaderVersion = 1, initialMinWriterVersion = 7, @@ -3418,7 +3421,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest expectedDowngradedProtocol = protocolWithWriterFeature(DomainMetadataTableFeature)) } - test("Protocol version is not downgraded when reader+writer features exist") { + test("Protocol version is not downgraded when multiple reader+writer features exist") { testProtocolVersionDowngrade( initialMinReaderVersion = 3, initialMinWriterVersion = 7, @@ -3427,15 +3430,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest expectedDowngradedProtocol = protocolWithReaderFeature(DeletionVectorsTableFeature)) } - test("Protocol version is not downgraded when both reader+writer and writer features exist") { - testProtocolVersionDowngrade( - initialMinReaderVersion = 3, - initialMinWriterVersion = 7, - featuresToAdd = Seq(TestRemovableReaderWriterFeature, TestRemovableWriterFeature), - featuresToRemove = Seq(TestRemovableReaderWriterFeature), - expectedDowngradedProtocol = - Protocol(3, 7, Some(Set.empty), Some(Set(TestRemovableWriterFeature.name)))) - + test("Protocol version is not downgraded when reader+writer features exist") { testProtocolVersionDowngrade( initialMinReaderVersion = 3, initialMinWriterVersion = 7, @@ -3473,6 +3468,50 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } } + for (truncateHistory <- BOOLEAN_DOMAIN) + test(s"Protocol version downgrade with Table Features - Basic test " + + s"truncateHistory: ${truncateHistory}") { + val expectedFeatures = Seq(RowTrackingFeature, DomainMetadataTableFeature) + + testProtocolVersionDowngrade( + initialMinReaderVersion = 3, + initialMinWriterVersion = 7, + featuresToAdd = expectedFeatures :+ TestRemovableReaderWriterFeature, + featuresToRemove = Seq(TestRemovableReaderWriterFeature), + expectedDowngradedProtocol = Protocol(1, 7).withFeatures(expectedFeatures), + truncateHistory = truncateHistory) + } + + for (truncateHistory <- BOOLEAN_DOMAIN) + test(s"Protocol version downgrade with Table Features - include legacy writer features: " + + s"truncateHistory: ${truncateHistory}") { + val expectedFeatures = + Seq(DomainMetadataTableFeature, ChangeDataFeedTableFeature, AppendOnlyTableFeature) + + testProtocolVersionDowngrade( + initialMinReaderVersion = 3, + initialMinWriterVersion = 7, + featuresToAdd = expectedFeatures :+ TestRemovableReaderWriterFeature, + featuresToRemove = Seq(TestRemovableReaderWriterFeature), + expectedDowngradedProtocol = Protocol(1, 7).withFeatures(expectedFeatures), + truncateHistory = truncateHistory) + } + + for (truncateHistory <- BOOLEAN_DOMAIN) + test(s"Protocol version downgrade with Table Features - include legacy reader features: " + + s"truncateHistory: ${truncateHistory}") { + val expectedFeatures = + Seq(DomainMetadataTableFeature, ChangeDataFeedTableFeature, ColumnMappingTableFeature) + + testProtocolVersionDowngrade( + initialMinReaderVersion = 3, + initialMinWriterVersion = 7, + featuresToAdd = expectedFeatures :+ TestRemovableReaderWriterFeature, + featuresToRemove = Seq(TestRemovableReaderWriterFeature), + expectedDowngradedProtocol = Protocol(2, 7).withFeatures(expectedFeatures), + truncateHistory = truncateHistory) + } + private def dropV2CheckpointsTableFeature(spark: SparkSession, log: DeltaLog): Unit = { spark.sql(s"ALTER TABLE delta.`${log.dataPath}` DROP FEATURE " + s"`${V2CheckpointTableFeature.name}`") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala index e56da1fe7b0..e922e3acb8a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala @@ -276,12 +276,11 @@ class DeltaTableFeatureSuite test("protocol downgrade compatibility") { val tableFeatureProtocol = Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - // Cannot downgrade when the original protocol does not support at a minimum writer features. - assert(!Protocol(1, 6).canDowngradeTo(Protocol(1, 6), droppedFeatureName = "")) - assert(tableFeatureProtocol.withFeature(TestWriterFeature) - .canDowngradeTo(Protocol(1, 1), droppedFeatureName = TestWriterFeature.name)) assert(Protocol(1, 7).withFeature(TestWriterFeature) - .canDowngradeTo(Protocol(1, 1), droppedFeatureName = TestWriterFeature.name)) + .canDowngradeTo(Protocol(1, 7), droppedFeatureName = TestWriterFeature.name)) + // When there are no explicit features the protocol versions need to be downgraded + // below table features. The new protocol versions need to match exactly the supported + // legacy features. for (n <- 1 to 3) { assert( !Protocol(n, 7) @@ -292,31 +291,13 @@ class DeltaTableFeatureSuite .withFeatures(Seq(TestWriterFeature, AppendOnlyTableFeature, InvariantsTableFeature)) .canDowngradeTo(Protocol(1, 2), droppedFeatureName = TestWriterFeature.name)) } - // When there are no explicit features the protocol versions need to be downgraded - // below table features. - assert(!tableFeatureProtocol.withFeature(TestWriterFeature) - .canDowngradeTo(tableFeatureProtocol, droppedFeatureName = TestWriterFeature.name)) - assert(!tableFeatureProtocol.withFeature(TestWriterFeature) - .canDowngradeTo(Protocol(2, 7), droppedFeatureName = TestWriterFeature.name)) - // Only one non-legacy writer feature per time. - assert(!tableFeatureProtocol.withFeatures(Seq(TestWriterFeature, TestRemovableWriterFeature)) - .canDowngradeTo(tableFeatureProtocol, droppedFeatureName = TestWriterFeature.name)) - // Remove reader+writer feature. assert(tableFeatureProtocol.withFeatures(Seq(TestReaderWriterFeature)) .canDowngradeTo(Protocol(1, 1), droppedFeatureName = TestReaderWriterFeature.name)) - // Only one non-legacy feature at a time - multiple reader+writer features. - assert( - !tableFeatureProtocol - .withFeatures(Seq(TestReaderWriterFeature, TestReaderWriterMetadataAutoUpdateFeature)) - .canDowngradeTo(tableFeatureProtocol, droppedFeatureName = "")) assert( tableFeatureProtocol .merge(Protocol(2, 5)) .withFeatures(Seq(TestReaderWriterFeature, TestRemovableLegacyReaderWriterFeature)) .canDowngradeTo(Protocol(2, 5), droppedFeatureName = TestReaderWriterFeature.name)) - // Only one feature at a time - mix of reader+writer and writer features. - assert(!tableFeatureProtocol.withFeatures(Seq(TestWriterFeature, TestReaderWriterFeature)) - .canDowngradeTo(tableFeatureProtocol, droppedFeatureName = TestWriterFeature.name)) // Downgraded protocol must be able to support all legacy table features. assert( !tableFeatureProtocol