Skip to content

Commit

Permalink
[Spark] Support dropping the CHECK constraints table feature (#2987)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR adds support for dropping the `checkConstraints` table feature
using the `ALTER TABLE ... DROP FEATURE` command. It throws an error if
the table still contains CHECK constraints (as dropping a feature should
never change the logical state of a table).

## How was this patch tested?

Added a test to `CheckConstraintsSuite`

## Does this PR introduce _any_ user-facing changes?

Yes, `ALTER TABLE ... DROP FEATURE checkConstraints` is now supported.
  • Loading branch information
tomvanbussel authored Jun 13, 2024
1 parent 4fac1f1 commit 5a6f382
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 1 deletion.
7 changes: 7 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@
],
"sqlState" : "42703"
},
"DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE" : {
"message" : [
"Cannot drop the CHECK constraints table feature.",
"The following constraints must be dropped first: <constraints>."
],
"sqlState" : "0AKDE"
},
"DELTA_CANNOT_EVALUATE_EXPRESSION" : {
"message" : [
"Cannot evaluate expression: <expression>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,13 @@ trait DeltaErrorsBase
messageParameters = Array.empty)
}

def cannotDropCheckConstraintFeature(constraintNames: Seq[String]): AnalysisException = {
new DeltaAnalysisException(
errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE",
messageParameters = Array(constraintNames.map(formatColumn).mkString(", "))
)
}

def incorrectLogStoreImplementationException(
sparkConf: SparkConf,
cause: Throwable): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec}
import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand
import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.managedcommit.ManagedCommitUtils
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
Expand Down Expand Up @@ -378,3 +379,22 @@ case class ColumnMappingPreDowngradeCommand(table: DeltaTableV2)
true
}
}

case class CheckConstraintsPreDowngradeTableFeatureCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand {

/**
* Throws an exception if the table has CHECK constraints, and returns false otherwise (as no
* action was required).
*
* We intentionally error out instead of removing the CHECK constraints here, as dropping a
* table feature should not never alter the logical representation of a table (only its physical
* representation). Instead, we ask the user to explicitly drop the constraints before the table
* feature can be dropped.
*/
override def removeFeatureTracesIfNeeded(): Boolean = {
val checkConstraintNames = Constraints.getCheckConstraintNames(table.initialSnapshot.metadata)
if (checkConstraintNames.isEmpty) return false
throw DeltaErrors.cannotDropCheckConstraintFeature(checkConstraintNames)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,12 +457,24 @@ object InvariantsTableFeature

object CheckConstraintsTableFeature
extends LegacyWriterFeature(name = "checkConstraints", minWriterVersion = 3)
with FeatureAutomaticallyEnabledByMetadata {
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
Constraints.getCheckConstraints(metadata, spark).nonEmpty
}

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
CheckConstraintsPreDowngradeTableFeatureCommand(table)

override def validateRemoval(snapshot: Snapshot): Boolean =
Constraints.getCheckConstraintNames(snapshot.metadata).isEmpty

override def actionUsesFeature(action: Action): Boolean = {
// This method is never called, as it is only used for ReaderWriterFeatures.
throw new UnsupportedOperationException()
}
}

object ChangeDataFeedTableFeature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ object Constraints {
/** A SQL expression to check for when writing out data. */
case class Check(name: String, expression: Expression) extends Constraint

def getCheckConstraintNames(metadata: Metadata): Seq[String] = {
metadata.configuration.keys.collect {
case key if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") =>
key.stripPrefix("delta.constraints.")
}.toSeq
}

/**
* Extract CHECK constraints from the table properties. Note that some CHECK constraints may also
* come from schema metadata; these constraints were never released in a public API but are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package org.apache.spark.sql.delta.schema

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.delta.DeltaLog

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.constraints.CharVarcharConstraint
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -459,4 +462,39 @@ class CheckConstraintsSuite extends QueryTest
)
}
}

test("drop table feature") {
withTable("table") {
sql("CREATE TABLE table (a INT, b INT) USING DELTA " +
"TBLPROPERTIES ('delta.feature.checkConstraints' = 'supported')")
sql("ALTER TABLE table ADD CONSTRAINT c1 CHECK (a > 0)")
sql("ALTER TABLE table ADD CONSTRAINT c2 CHECK (b > 0)")

val error1 = intercept[AnalysisException] {
sql("ALTER TABLE table DROP FEATURE checkConstraints")
}
checkError(
error1,
errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE",
parameters = Map("constraints" -> "`c1`, `c2`")
)
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("table"))
assert(deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints"))

sql("ALTER TABLE table DROP CONSTRAINT c1")
val error2 = intercept[AnalysisException] {
sql("ALTER TABLE table DROP FEATURE checkConstraints")
}
checkError(
error2,
errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE",
parameters = Map("constraints" -> "`c2`")
)
assert(deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints"))

sql("ALTER TABLE table DROP CONSTRAINT c2")
sql("ALTER TABLE table DROP FEATURE checkConstraints")
assert(!deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints"))
}
}
}

0 comments on commit 5a6f382

Please # to comment.