Skip to content

Commit

Permalink
[Spark] Support show tblproperties and update catalog for clustered t…
Browse files Browse the repository at this point in the history
…able (#3271)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Support show tblproperties for clustered table, and support updating the
clustering column properties in the catalog. Remove table properties
from describe detail's output since that's using the properties from
metadata.
## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Add verification for table properties, describe detail, and catalog
table in verifyClusteringColumns.


## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
zedtang authored Jun 17, 2024
1 parent 96ae1c5 commit 75c6acb
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ class DeltaCatalog extends DelegatingCatalogExtension
var validatedConfigurations =
DeltaConfigs.validateConfigurations(tableDesc.properties)
ClusteredTableUtils.validateExistingTableFeatureProperties(validatedConfigurations)
// Add needed configs for Clustered table.
// Add needed configs for Clustered table. Note that [[PROP_CLUSTERING_COLUMNS]] can only
// be added after [[DeltaConfigs.validateConfigurations]] to avoid non-user configurable check
// failure.
if (maybeClusterBySpec.nonEmpty) {
validatedConfigurations =
validatedConfigurations ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.{util => ju}
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumnInfo}
import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.commands.cdc.CDCReader
Expand Down Expand Up @@ -193,6 +195,13 @@ case class DeltaTableV2(
base.put(TableCatalog.PROP_EXTERNAL, "true")
}
}
// Don't use [[PROP_CLUSTERING_COLUMNS]] from CatalogTable because it may be stale.
// Since ALTER TABLE updates it using an async post-commit hook.
clusterBySpec.foreach { clusterBy =>
ClusterBySpec.toProperties(clusterBy).foreach { case (key, value) =>
base.put(key, value)
}
}
Option(initialSnapshot.metadata.description).foreach(base.put(TableCatalog.PROP_COMMENT, _))
base.asJava
}
Expand Down Expand Up @@ -322,6 +331,17 @@ case class DeltaTableV2(
override def v1Table: CatalogTable = ttSafeCatalogTable.getOrElse {
throw DeltaErrors.invalidV1TableCall("v1Table", "DeltaTableV2")
}

lazy val clusterBySpec: Option[ClusterBySpec] = {
// Always get the clustering columns from metadata domain in delta log.
if (ClusteredTableUtils.isSupported(initialSnapshot.protocol)) {
val clusteringColumns = ClusteringColumnInfo.extractLogicalNames(
initialSnapshot)
Some(ClusterBySpec.fromColumnNames(clusteringColumns))
} else {
None
}
}
}

object DeltaTableV2 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumnInfo}
import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta.{DeltaConfigs, DeltaTableIdentifier, OptimisticTransactionImpl, Snapshot}
import org.apache.spark.sql.delta.actions.{Action, Metadata}
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -123,6 +125,20 @@ trait UpdateCatalogBase extends PostCommitHook with DeltaLogging {
return DeltaTableIdentifier.isDeltaPath(spark, table.identifier)
}

/** Check if the clustering columns from snapshot doesn't match what's in the table properties. */
protected def clusteringColumnsChanged(snapshot: Snapshot): Boolean = {
if (!ClusteredTableUtils.isSupported(snapshot.protocol)) {
return false
}
val currentLogicalClusteringNames =
ClusteringColumnInfo.extractLogicalNames(snapshot).mkString(",")
val clusterBySpecOpt = ClusterBySpec.fromProperties(table.properties)

// Since we don't remove the clustering columns table property, this can't happen.
assert(!(currentLogicalClusteringNames.nonEmpty && clusterBySpecOpt.isEmpty))
clusterBySpecOpt.exists(_.columnNames.map(_.toString).mkString(",") !=
currentLogicalClusteringNames)
}

/** Update the entry in the Catalog to reflect the latest schema and table properties. */
protected def execute(
Expand Down Expand Up @@ -161,6 +177,15 @@ trait UpdateCatalogBase extends PostCommitHook with DeltaLogging {
"delta.catalog.update.properties",
data = loggingData
)
} else if (clusteringColumnsChanged(snapshot)) {
// If the clustering columns changed, we'll update the catalog with the new
// table properties.
updateProperties(spark, snapshot)
recordDeltaEvent(
snapshot.deltaLog,
"delta.catalog.update.clusteringColumns",
data = loggingData
)
}
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -259,7 +284,8 @@ case class UpdateCatalog(table: CatalogTable) extends UpdateCatalogBase {
}

object UpdateCatalog {
private var tp: ExecutionContext = _
// Exposed for testing.
private[delta] var tp: ExecutionContext = _

// This is the encoding of the database for the Hive MetaStore
private val latin1 = Charset.forName("ISO-8859-1")
Expand Down Expand Up @@ -344,6 +370,14 @@ object UpdateCatalog {
snapshot.getProperties.toMap ++ Map(
DeltaConfigs.METASTORE_LAST_UPDATE_VERSION -> snapshot.version.toString,
DeltaConfigs.METASTORE_LAST_COMMIT_TIMESTAMP -> snapshot.timestamp.toString)
if (ClusteredTableUtils.isSupported(snapshot.protocol)) {
val clusteringColumns = ClusteringColumnInfo.extractLogicalNames(snapshot)
val properties = ClusterBySpec.toProperties(
ClusterBySpec.fromColumnNames(clusteringColumns))
properties.foreach { case (key, value) =>
newProperties += (key -> value)
}
}
newProperties
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ trait ImplicitMetadataOperation extends DeltaLogging {
throw DeltaErrors.unexpectedDataChangeException("Create a Delta table")
}
val description = configuration.get("comment").orNull
val cleanedConfs = configuration.filterKeys(_ != "comment").toMap
// Filter out the property for clustering columns from Metadata action.
val cleanedConfs = ClusteredTableUtils.removeClusteringColumnsProperty(
configuration.filterKeys(_ != "comment").toMap)
txn.updateMetadata(
Metadata(
description = description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ object ClusterBySpec {
ClusteredTableUtils.PROP_CLUSTERING_COLUMNS -> clusterBySpec.toJson
}

def fromProperties(properties: Map[String, String]): Option[ClusterBySpec] = {
properties.get(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS).map { clusteringColumns =>
fromProperty(clusteringColumns)
}
}

def toProperties(clusterBySpec: ClusterBySpec): Map[String, String] = {
val columnValue = mapper.writeValueAsString(clusterBySpec.columnNames.map(_.fieldNames))
Map(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS -> columnValue)
}

def fromColumnNames(names: Seq[String]): ClusterBySpec = {
ClusterBySpec(names.map(FieldReference(_)))
}
Expand Down
20 changes: 12 additions & 8 deletions spark/src/test/scala/io/delta/tables/DeltaTableBuilderSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -466,15 +466,19 @@ class DeltaTableBuilderSuite
}

test("create table with clustering") {
withTable("test") {
io.delta.tables.DeltaTable.create().tableName("test")
.addColumn("c1", "int")
.clusterBy("c1")
.execute()
withSQLConf(
// Enable update catalog for verifyClusteringColumns.
DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true") {
withTable("test") {
io.delta.tables.DeltaTable.create().tableName("test")
.addColumn("c1", "int")
.clusterBy("c1")
.execute()

val deltaLog = DeltaLog.forTable(spark, TableIdentifier("test"))
val metadata = deltaLog.snapshot.metadata
verifyClusteringColumns(TableIdentifier("test"), Seq("c1"))
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("test"))
val metadata = deltaLog.snapshot.metadata
verifyClusteringColumns(TableIdentifier("test"), Seq("c1"))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package org.apache.spark.sql.delta.skipping

import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumn, ClusteringColumnInfo}
import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
import org.apache.spark.sql.delta.DeltaOperations.{CLUSTERING_PARAMETER_KEY, ZORDER_PARAMETER_KEY}
import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
import org.apache.spark.sql.delta.hooks.UpdateCatalog
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import org.junit.Assert.assertEquals

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -210,14 +214,32 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession

def verifyClusteringColumns(
tableIdentifier: TableIdentifier,
expectedLogicalClusteringColumns: Seq[String]
expectedLogicalClusteringColumns: Seq[String],
skipCatalogCheck: Boolean = false
): Unit = {
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier)
verifyClusteringColumnsInternal(
snapshot,
tableIdentifier.table,
expectedLogicalClusteringColumns
)

if (skipCatalogCheck) {
return
}

val updateCatalogEnabled = spark.conf.get(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED)
assert(updateCatalogEnabled,
"need to enable [[DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED]] to verify catalog updates.")
UpdateCatalog.awaitCompletion(10000)
val catalog = spark.sessionState.catalog
catalog.refreshTable(tableIdentifier)
val table = catalog.getTableMetadata(tableIdentifier)

// Verify CatalogTable's clusterBySpec.
assert(ClusteredTableUtils.getClusterBySpecOptional(table).isDefined)
assertEquals(ClusterBySpec.fromColumnNames(expectedLogicalClusteringColumns),
ClusteredTableUtils.getClusterBySpecOptional(table).get)
}

def verifyClusteringColumns(
Expand All @@ -243,6 +265,24 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
verifyDescribeHistoryOperationParameters(
tableNameOrPath
)

// Verify DESCRIBE DETAIL's properties doesn't contain the "clusteringColumns" key.
val describeDetailProps = sql(s"describe detail $tableNameOrPath")
.select("properties")
.first
.getAs[Map[String, String]](0)
assert(!describeDetailProps.contains(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS))

// Verify SHOW TBLPROPERTIES contains the correct clustering columns.
val clusteringColumnsVal =
sql(s"show tblproperties $tableNameOrPath")
.filter($"key" === ClusteredTableUtils.PROP_CLUSTERING_COLUMNS)
.select("value")
.first
.getString(0)
val clusterBySpec = ClusterBySpec.fromProperties(
Map(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS -> clusteringColumnsVal)).get
assert(expectedLogicalClusteringColumns === clusterBySpec.columnNames.map(_.toString))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions}
import org.apache.spark.sql.delta.skipping.ClusteredTableTestUtils
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingEnableIdMode, DeltaColumnMappingEnableNameMode, DeltaConfigs, DeltaExcludedBySparkVersionTestMixinShims, DeltaLog, DeltaUnsupportedOperationException}
import org.apache.spark.sql.delta.clustering.ClusteringMetadataDomain
import org.apache.spark.sql.delta.hooks.UpdateCatalog
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.SkippingEligibleDataType
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
Expand All @@ -37,6 +38,35 @@ trait ClusteredTableCreateOrReplaceDDLSuiteBase extends QueryTest
with SharedSparkSession
with ClusteredTableTestUtils {

override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key, "true")
}

override def afterAll(): Unit = {
// Reset UpdateCatalog's thread pool to ensure it is re-initialized in the next test suite.
// This is necessary because the [[SparkThreadLocalForwardingThreadPoolExecutor]]
// retains a reference to the SparkContext. Without resetting, the new test suite would
// reuse the same SparkContext from the previous suite, despite it being stopped.
//
// This will force the UpdateCatalog's background thread to use the new SparkContext.
//
// scalastyle:off line.size.limit
// This is to avoid the following exception thrown from the UpdateCatalog's background thread:
// java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
// This stopped SparkContext was created at:
//
// org.apache.spark.sql.delta.skipping.clustering.ClusteredTableDDLDataSourceV2NameColumnMappingSuite.beforeAll
//
// The currently active SparkContext was created at:
//
// org.apache.spark.sql.delta.skipping.clustering.ClusteredTableDDLDataSourceV2Suite.beforeAll
// scalastyle:on line.size.limit
UpdateCatalog.tp = null

super.afterAll()
}

protected val testTable: String = "test_ddl_table"
protected val sourceTable: String = "test_ddl_source"
protected val targetTable: String = "test_ddl_target"
Expand Down Expand Up @@ -627,6 +657,17 @@ trait ClusteredTableDDLSuiteBase
}
}

test("alter table cluster by - catalog reflects clustering columns when reordered") {
withClusteredTable(testTable, "id INT, a STRUCT<b INT, c STRING>, name STRING", "id, name") {
val tableIdentifier = TableIdentifier(testTable)
verifyClusteringColumns(tableIdentifier, Seq("id", "name"))

// Re-order the clustering keys and validate the catalog sees the correctly reordered keys.
sql(s"ALTER TABLE $testTable CLUSTER BY (name, id)")
verifyClusteringColumns(tableIdentifier, Seq("name", "id"))
}
}

test("alter table cluster by - error scenarios") {
withClusteredTable(testTable, "id INT, id2 INT, name STRING", "id, name") {
// Specify non-existing columns.
Expand Down Expand Up @@ -862,7 +903,7 @@ trait ClusteredTableDDLSuiteBase

sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0")
val (_, currentSnapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier)
verifyClusteringColumns(tableIdentifier, Seq.empty)
verifyClusteringColumns(tableIdentifier, Seq.empty, skipCatalogCheck = true)
}

// Scenario 2: restore clustered table to previous clustering columns.
Expand All @@ -873,7 +914,7 @@ trait ClusteredTableDDLSuiteBase
verifyClusteringColumns(tableIdentifier, Seq("b"))

sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0")
verifyClusteringColumns(tableIdentifier, Seq("a"))
verifyClusteringColumns(tableIdentifier, Seq("a"), skipCatalogCheck = true)
}

// Scenario 3: restore from table with clustering columns to non-empty clustering columns
Expand All @@ -884,7 +925,7 @@ trait ClusteredTableDDLSuiteBase
verifyClusteringColumns(tableIdentifier, Seq.empty)

sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0")
verifyClusteringColumns(tableIdentifier, Seq("a"))
verifyClusteringColumns(tableIdentifier, Seq("a"), skipCatalogCheck = true)
}

// Scenario 4: restore to start version.
Expand All @@ -894,7 +935,7 @@ trait ClusteredTableDDLSuiteBase
sql(s"INSERT INTO $testTable VALUES (1)")

sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0")
verifyClusteringColumns(tableIdentifier, Seq("a"))
verifyClusteringColumns(tableIdentifier, Seq("a"), skipCatalogCheck = true)
}

// Scenario 5: restore unclustered table to unclustered table.
Expand Down Expand Up @@ -933,6 +974,7 @@ trait ClusteredTableDDLSuiteBase
}

trait ClusteredTableDDLSuite extends ClusteredTableDDLSuiteBase

trait ClusteredTableDDLWithNameColumnMapping
extends ClusteredTableCreateOrReplaceDDLSuite with DeltaColumnMappingEnableNameMode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ class IncrementalZCubeClusteringSuite extends QueryTest

test("test changing clustering columns") {
withSQLConf(
SQLConf.MAX_RECORDS_PER_FILE.key -> "2") {
SQLConf.MAX_RECORDS_PER_FILE.key -> "2",
// Enable update catalog for verifyClusteringColumns.
DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true") {
withClusteredTable(
table = table,
schema = "col1 int, col2 int",
Expand Down

0 comments on commit 75c6acb

Please # to comment.