From 75c6acbb1cffe745b3616703185f91a76b2ea962 Mon Sep 17 00:00:00 2001 From: Jiaheng Tang Date: Mon, 17 Jun 2024 10:47:54 -0700 Subject: [PATCH] [Spark] Support show tblproperties and update catalog for clustered table (#3271) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Support show tblproperties for clustered table, and support updating the clustering column properties in the catalog. Remove table properties from describe detail's output since that's using the properties from metadata. ## How was this patch tested? Add verification for table properties, describe detail, and catalog table in verifyClusteringColumns. ## Does this PR introduce _any_ user-facing changes? No --- .../sql/delta/catalog/DeltaCatalog.scala | 4 +- .../sql/delta/catalog/DeltaTableV2.scala | 20 ++++++++ .../spark/sql/delta/hooks/UpdateCatalog.scala | 36 ++++++++++++- .../schema/ImplicitMetadataOperation.scala | 4 +- .../clustering/temp/ClusterBySpec.scala | 11 ++++ .../delta/tables/DeltaTableBuilderSuite.scala | 20 +++++--- .../skipping/ClusteredTableTestUtils.scala | 42 +++++++++++++++- .../clustering/ClusteredTableDDLSuite.scala | 50 +++++++++++++++++-- .../IncrementalZCubeClusteringSuite.scala | 4 +- 9 files changed, 174 insertions(+), 17 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala index 2eef89c9e29..e37598b29e9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala @@ -449,7 +449,9 @@ class DeltaCatalog extends DelegatingCatalogExtension var validatedConfigurations = DeltaConfigs.validateConfigurations(tableDesc.properties) ClusteredTableUtils.validateExistingTableFeatureProperties(validatedConfigurations) - // Add needed configs for Clustered table. + // Add needed configs for Clustered table. Note that [[PROP_CLUSTERING_COLUMNS]] can only + // be added after [[DeltaConfigs.validateConfigurations]] to avoid non-user configurable check + // failure. if (maybeClusterBySpec.nonEmpty) { validatedConfigurations = validatedConfigurations ++ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index ca24cf5f12c..e3aec66e88e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -22,6 +22,8 @@ import java.{util => ju} import scala.collection.JavaConverters._ import scala.collection.mutable +import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumnInfo} +import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.commands.WriteIntoDelta import org.apache.spark.sql.delta.commands.cdc.CDCReader @@ -193,6 +195,13 @@ case class DeltaTableV2( base.put(TableCatalog.PROP_EXTERNAL, "true") } } + // Don't use [[PROP_CLUSTERING_COLUMNS]] from CatalogTable because it may be stale. + // Since ALTER TABLE updates it using an async post-commit hook. + clusterBySpec.foreach { clusterBy => + ClusterBySpec.toProperties(clusterBy).foreach { case (key, value) => + base.put(key, value) + } + } Option(initialSnapshot.metadata.description).foreach(base.put(TableCatalog.PROP_COMMENT, _)) base.asJava } @@ -322,6 +331,17 @@ case class DeltaTableV2( override def v1Table: CatalogTable = ttSafeCatalogTable.getOrElse { throw DeltaErrors.invalidV1TableCall("v1Table", "DeltaTableV2") } + + lazy val clusterBySpec: Option[ClusterBySpec] = { + // Always get the clustering columns from metadata domain in delta log. + if (ClusteredTableUtils.isSupported(initialSnapshot.protocol)) { + val clusteringColumns = ClusteringColumnInfo.extractLogicalNames( + initialSnapshot) + Some(ClusterBySpec.fromColumnNames(clusteringColumns)) + } else { + None + } + } } object DeltaTableV2 { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/UpdateCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/UpdateCatalog.scala index 5dd9dc6ffff..dd8f295e829 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/UpdateCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/UpdateCatalog.scala @@ -24,6 +24,8 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException} import scala.util.Try import scala.util.control.NonFatal +import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumnInfo} +import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec import org.apache.spark.sql.delta.{DeltaConfigs, DeltaTableIdentifier, OptimisticTransactionImpl, Snapshot} import org.apache.spark.sql.delta.actions.{Action, Metadata} import org.apache.spark.sql.delta.metering.DeltaLogging @@ -123,6 +125,20 @@ trait UpdateCatalogBase extends PostCommitHook with DeltaLogging { return DeltaTableIdentifier.isDeltaPath(spark, table.identifier) } + /** Check if the clustering columns from snapshot doesn't match what's in the table properties. */ + protected def clusteringColumnsChanged(snapshot: Snapshot): Boolean = { + if (!ClusteredTableUtils.isSupported(snapshot.protocol)) { + return false + } + val currentLogicalClusteringNames = + ClusteringColumnInfo.extractLogicalNames(snapshot).mkString(",") + val clusterBySpecOpt = ClusterBySpec.fromProperties(table.properties) + + // Since we don't remove the clustering columns table property, this can't happen. + assert(!(currentLogicalClusteringNames.nonEmpty && clusterBySpecOpt.isEmpty)) + clusterBySpecOpt.exists(_.columnNames.map(_.toString).mkString(",") != + currentLogicalClusteringNames) + } /** Update the entry in the Catalog to reflect the latest schema and table properties. */ protected def execute( @@ -161,6 +177,15 @@ trait UpdateCatalogBase extends PostCommitHook with DeltaLogging { "delta.catalog.update.properties", data = loggingData ) + } else if (clusteringColumnsChanged(snapshot)) { + // If the clustering columns changed, we'll update the catalog with the new + // table properties. + updateProperties(spark, snapshot) + recordDeltaEvent( + snapshot.deltaLog, + "delta.catalog.update.clusteringColumns", + data = loggingData + ) } } catch { case NonFatal(e) => @@ -259,7 +284,8 @@ case class UpdateCatalog(table: CatalogTable) extends UpdateCatalogBase { } object UpdateCatalog { - private var tp: ExecutionContext = _ + // Exposed for testing. + private[delta] var tp: ExecutionContext = _ // This is the encoding of the database for the Hive MetaStore private val latin1 = Charset.forName("ISO-8859-1") @@ -344,6 +370,14 @@ object UpdateCatalog { snapshot.getProperties.toMap ++ Map( DeltaConfigs.METASTORE_LAST_UPDATE_VERSION -> snapshot.version.toString, DeltaConfigs.METASTORE_LAST_COMMIT_TIMESTAMP -> snapshot.timestamp.toString) + if (ClusteredTableUtils.isSupported(snapshot.protocol)) { + val clusteringColumns = ClusteringColumnInfo.extractLogicalNames(snapshot) + val properties = ClusterBySpec.toProperties( + ClusterBySpec.fromColumnNames(clusteringColumns)) + properties.foreach { case (key, value) => + newProperties += (key -> value) + } + } newProperties } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala index 5934cb9d3af..53503bb2cb2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala @@ -107,7 +107,9 @@ trait ImplicitMetadataOperation extends DeltaLogging { throw DeltaErrors.unexpectedDataChangeException("Create a Delta table") } val description = configuration.get("comment").orNull - val cleanedConfs = configuration.filterKeys(_ != "comment").toMap + // Filter out the property for clustering columns from Metadata action. + val cleanedConfs = ClusteredTableUtils.removeClusteringColumnsProperty( + configuration.filterKeys(_ != "comment").toMap) txn.updateMetadata( Metadata( description = description, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterBySpec.scala b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterBySpec.scala index d73f8a436ee..48a65ba382d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterBySpec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/temp/ClusterBySpec.scala @@ -67,6 +67,17 @@ object ClusterBySpec { ClusteredTableUtils.PROP_CLUSTERING_COLUMNS -> clusterBySpec.toJson } + def fromProperties(properties: Map[String, String]): Option[ClusterBySpec] = { + properties.get(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS).map { clusteringColumns => + fromProperty(clusteringColumns) + } + } + + def toProperties(clusterBySpec: ClusterBySpec): Map[String, String] = { + val columnValue = mapper.writeValueAsString(clusterBySpec.columnNames.map(_.fieldNames)) + Map(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS -> columnValue) + } + def fromColumnNames(names: Seq[String]): ClusterBySpec = { ClusterBySpec(names.map(FieldReference(_))) } diff --git a/spark/src/test/scala/io/delta/tables/DeltaTableBuilderSuite.scala b/spark/src/test/scala/io/delta/tables/DeltaTableBuilderSuite.scala index 6839223b063..cbeb5b24bc7 100644 --- a/spark/src/test/scala/io/delta/tables/DeltaTableBuilderSuite.scala +++ b/spark/src/test/scala/io/delta/tables/DeltaTableBuilderSuite.scala @@ -466,15 +466,19 @@ class DeltaTableBuilderSuite } test("create table with clustering") { - withTable("test") { - io.delta.tables.DeltaTable.create().tableName("test") - .addColumn("c1", "int") - .clusterBy("c1") - .execute() + withSQLConf( + // Enable update catalog for verifyClusteringColumns. + DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true") { + withTable("test") { + io.delta.tables.DeltaTable.create().tableName("test") + .addColumn("c1", "int") + .clusterBy("c1") + .execute() - val deltaLog = DeltaLog.forTable(spark, TableIdentifier("test")) - val metadata = deltaLog.snapshot.metadata - verifyClusteringColumns(TableIdentifier("test"), Seq("c1")) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("test")) + val metadata = deltaLog.snapshot.metadata + verifyClusteringColumns(TableIdentifier("test"), Seq("c1")) + } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala index 8fb439211b0..1ab4f0ce10e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala @@ -17,10 +17,14 @@ package org.apache.spark.sql.delta.skipping import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumn, ClusteringColumnInfo} +import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec import org.apache.spark.sql.delta.{DeltaLog, Snapshot} import org.apache.spark.sql.delta.DeltaOperations.{CLUSTERING_PARAMETER_KEY, ZORDER_PARAMETER_KEY} import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics +import org.apache.spark.sql.delta.hooks.UpdateCatalog +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.JsonUtils +import org.junit.Assert.assertEquals import org.apache.spark.SparkFunSuite import org.apache.spark.sql.DataFrame @@ -210,7 +214,8 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession def verifyClusteringColumns( tableIdentifier: TableIdentifier, - expectedLogicalClusteringColumns: Seq[String] + expectedLogicalClusteringColumns: Seq[String], + skipCatalogCheck: Boolean = false ): Unit = { val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier) verifyClusteringColumnsInternal( @@ -218,6 +223,23 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession tableIdentifier.table, expectedLogicalClusteringColumns ) + + if (skipCatalogCheck) { + return + } + + val updateCatalogEnabled = spark.conf.get(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED) + assert(updateCatalogEnabled, + "need to enable [[DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED]] to verify catalog updates.") + UpdateCatalog.awaitCompletion(10000) + val catalog = spark.sessionState.catalog + catalog.refreshTable(tableIdentifier) + val table = catalog.getTableMetadata(tableIdentifier) + + // Verify CatalogTable's clusterBySpec. + assert(ClusteredTableUtils.getClusterBySpecOptional(table).isDefined) + assertEquals(ClusterBySpec.fromColumnNames(expectedLogicalClusteringColumns), + ClusteredTableUtils.getClusterBySpecOptional(table).get) } def verifyClusteringColumns( @@ -243,6 +265,24 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession verifyDescribeHistoryOperationParameters( tableNameOrPath ) + + // Verify DESCRIBE DETAIL's properties doesn't contain the "clusteringColumns" key. + val describeDetailProps = sql(s"describe detail $tableNameOrPath") + .select("properties") + .first + .getAs[Map[String, String]](0) + assert(!describeDetailProps.contains(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS)) + + // Verify SHOW TBLPROPERTIES contains the correct clustering columns. + val clusteringColumnsVal = + sql(s"show tblproperties $tableNameOrPath") + .filter($"key" === ClusteredTableUtils.PROP_CLUSTERING_COLUMNS) + .select("value") + .first + .getString(0) + val clusterBySpec = ClusterBySpec.fromProperties( + Map(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS -> clusteringColumnsVal)).get + assert(expectedLogicalClusteringColumns === clusterBySpec.columnNames.map(_.toString)) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala index 2edf7354be6..6ea5ca59d61 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala @@ -22,6 +22,7 @@ import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions} import org.apache.spark.sql.delta.skipping.ClusteredTableTestUtils import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingEnableIdMode, DeltaColumnMappingEnableNameMode, DeltaConfigs, DeltaExcludedBySparkVersionTestMixinShims, DeltaLog, DeltaUnsupportedOperationException} import org.apache.spark.sql.delta.clustering.ClusteringMetadataDomain +import org.apache.spark.sql.delta.hooks.UpdateCatalog import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.stats.SkippingEligibleDataType import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} @@ -37,6 +38,35 @@ trait ClusteredTableCreateOrReplaceDDLSuiteBase extends QueryTest with SharedSparkSession with ClusteredTableTestUtils { + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key, "true") + } + + override def afterAll(): Unit = { + // Reset UpdateCatalog's thread pool to ensure it is re-initialized in the next test suite. + // This is necessary because the [[SparkThreadLocalForwardingThreadPoolExecutor]] + // retains a reference to the SparkContext. Without resetting, the new test suite would + // reuse the same SparkContext from the previous suite, despite it being stopped. + // + // This will force the UpdateCatalog's background thread to use the new SparkContext. + // + // scalastyle:off line.size.limit + // This is to avoid the following exception thrown from the UpdateCatalog's background thread: + // java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext. + // This stopped SparkContext was created at: + // + // org.apache.spark.sql.delta.skipping.clustering.ClusteredTableDDLDataSourceV2NameColumnMappingSuite.beforeAll + // + // The currently active SparkContext was created at: + // + // org.apache.spark.sql.delta.skipping.clustering.ClusteredTableDDLDataSourceV2Suite.beforeAll + // scalastyle:on line.size.limit + UpdateCatalog.tp = null + + super.afterAll() + } + protected val testTable: String = "test_ddl_table" protected val sourceTable: String = "test_ddl_source" protected val targetTable: String = "test_ddl_target" @@ -627,6 +657,17 @@ trait ClusteredTableDDLSuiteBase } } + test("alter table cluster by - catalog reflects clustering columns when reordered") { + withClusteredTable(testTable, "id INT, a STRUCT, name STRING", "id, name") { + val tableIdentifier = TableIdentifier(testTable) + verifyClusteringColumns(tableIdentifier, Seq("id", "name")) + + // Re-order the clustering keys and validate the catalog sees the correctly reordered keys. + sql(s"ALTER TABLE $testTable CLUSTER BY (name, id)") + verifyClusteringColumns(tableIdentifier, Seq("name", "id")) + } + } + test("alter table cluster by - error scenarios") { withClusteredTable(testTable, "id INT, id2 INT, name STRING", "id, name") { // Specify non-existing columns. @@ -862,7 +903,7 @@ trait ClusteredTableDDLSuiteBase sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0") val (_, currentSnapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier) - verifyClusteringColumns(tableIdentifier, Seq.empty) + verifyClusteringColumns(tableIdentifier, Seq.empty, skipCatalogCheck = true) } // Scenario 2: restore clustered table to previous clustering columns. @@ -873,7 +914,7 @@ trait ClusteredTableDDLSuiteBase verifyClusteringColumns(tableIdentifier, Seq("b")) sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0") - verifyClusteringColumns(tableIdentifier, Seq("a")) + verifyClusteringColumns(tableIdentifier, Seq("a"), skipCatalogCheck = true) } // Scenario 3: restore from table with clustering columns to non-empty clustering columns @@ -884,7 +925,7 @@ trait ClusteredTableDDLSuiteBase verifyClusteringColumns(tableIdentifier, Seq.empty) sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0") - verifyClusteringColumns(tableIdentifier, Seq("a")) + verifyClusteringColumns(tableIdentifier, Seq("a"), skipCatalogCheck = true) } // Scenario 4: restore to start version. @@ -894,7 +935,7 @@ trait ClusteredTableDDLSuiteBase sql(s"INSERT INTO $testTable VALUES (1)") sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0") - verifyClusteringColumns(tableIdentifier, Seq("a")) + verifyClusteringColumns(tableIdentifier, Seq("a"), skipCatalogCheck = true) } // Scenario 5: restore unclustered table to unclustered table. @@ -933,6 +974,7 @@ trait ClusteredTableDDLSuiteBase } trait ClusteredTableDDLSuite extends ClusteredTableDDLSuiteBase + trait ClusteredTableDDLWithNameColumnMapping extends ClusteredTableCreateOrReplaceDDLSuite with DeltaColumnMappingEnableNameMode diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/IncrementalZCubeClusteringSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/IncrementalZCubeClusteringSuite.scala index cecaa9c7a58..36e46355a1f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/IncrementalZCubeClusteringSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/IncrementalZCubeClusteringSuite.scala @@ -191,7 +191,9 @@ class IncrementalZCubeClusteringSuite extends QueryTest test("test changing clustering columns") { withSQLConf( - SQLConf.MAX_RECORDS_PER_FILE.key -> "2") { + SQLConf.MAX_RECORDS_PER_FILE.key -> "2", + // Enable update catalog for verifyClusteringColumns. + DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true") { withClusteredTable( table = table, schema = "col1 int, col2 int",