Skip to content

Commit f22acc0

Browse files
committed
initial commit
1 parent 59f8c64 commit f22acc0

File tree

4 files changed

+77
-7
lines changed

4 files changed

+77
-7
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/DomainMetadataUtils.scala

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.apache.spark.sql.delta
1818

19+
import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
1920
import org.apache.spark.sql.delta.actions.{Action, DomainMetadata, Protocol}
2021
import org.apache.spark.sql.delta.clustering.ClusteringMetadataDomain
2122
import org.apache.spark.sql.delta.metering.DeltaLogging
@@ -91,15 +92,40 @@ object DomainMetadataUtils extends DeltaLogging {
9192

9293
/**
9394
* Generates a new sequence of DomainMetadata to commits for RESTORE TABLE.
94-
* - Source (table to restore to) domains will be copied if they appear in the pre-defined
95+
* - Domains in the toSnapshot will be copied if they appear in the pre-defined
9596
* "copy" list (e.g., table features require some specific domains to be copied).
96-
* - All other domains not in the list are "retained".
97+
* - All other domains not in the list are dropped from the "toSnapshot".
98+
*
99+
* For clustering metadata domains, it overwrites the existing domain metadata in the
100+
* fromSnapshot with the following clustering columns.
101+
* 1. If toSnapshot is not a clustered table or missing domain metadata, use empty clustering
102+
* columns.
103+
* 2. If toSnapshot is a clustered table, use the clustering columns from toSnapshot.
104+
*
105+
* @param toSnapshot The snapshot being restored to, which is referred as "source" table.
106+
* @param fromSnapshot The snapshot being restored from, which is the current state.
97107
*/
98108
def handleDomainMetadataForRestoreTable(
99-
sourceDomainMetadatas: Seq[DomainMetadata]): Seq[DomainMetadata] = {
100-
sourceDomainMetadatas.filter { m =>
109+
toSnapshot: Snapshot,
110+
fromSnapshot: Snapshot): Seq[DomainMetadata] = {
111+
val filteredDomainMetadata = toSnapshot.domainMetadata.filter { m =>
101112
METADATA_DOMAIN_TO_COPY_FOR_RESTORE_TABLE.contains(m.domain)
102113
}
114+
val clusteringColumnsToRestore = ClusteredTableUtils.getClusteringColumnsOptional(toSnapshot)
115+
116+
val isRestoringToClusteredTable =
117+
ClusteredTableUtils.isSupported(toSnapshot.protocol) && clusteringColumnsToRestore.nonEmpty
118+
val clusteringColumns = if (isRestoringToClusteredTable) {
119+
// We overwrite the clustering columns in the fromSnapshot with the clustering columns
120+
// in the toSnapshot.
121+
clusteringColumnsToRestore.get
122+
} else {
123+
// toSnapshot is not a clustered table or missing domain metadata, so we write domain
124+
// metadata with empty clustering columns.
125+
Seq.empty
126+
}
127+
128+
filteredDomainMetadata ++ Seq(ClusteredTableUtils.createDomainMetadata(clusteringColumns))
103129
}
104130

105131
/**

spark/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.sql.Timestamp
2121
import scala.collection.JavaConverters._
2222
import scala.util.{Success, Try}
2323

24-
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaOperations, Snapshot}
24+
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaOperations, DomainMetadataUtils, Snapshot}
2525
import org.apache.spark.sql.delta.actions.{AddFile, DeletionVectorDescriptor, RemoveFile}
2626
import org.apache.spark.sql.delta.catalog.DeltaTableV2
2727
import org.apache.spark.sql.delta.sources.DeltaSQLConf
@@ -205,9 +205,12 @@ case class RestoreTableCommand(sourceTable: DeltaTableV2)
205205
sourceProtocol.merge(targetProtocol)
206206
}
207207

208+
val actions = addActions ++ removeActions ++
209+
DomainMetadataUtils.handleDomainMetadataForRestoreTable(snapshotToRestore, latestSnapshot)
210+
208211
txn.commitLarge(
209212
spark,
210-
addActions ++ removeActions,
213+
actions,
211214
Some(newProtocol),
212215
DeltaOperations.Restore(version, timestamp),
213216
Map.empty,

spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ trait ClusteredTableTestUtilsBase extends SparkFunSuite with SharedSparkSession
152152
} else {
153153
assertClusterByNotExist()
154154
}
155-
case "WRITE" =>
155+
case "WRITE" | "RESTORE" =>
156+
// These are known operations from our tests that don't have clusterBy.
156157
doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
157158
case _ =>
158159
// Other operations are not tested yet. If the test fails here, please check the expected

spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.File
2121
import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions}
2222
import org.apache.spark.sql.delta.skipping.ClusteredTableTestUtils
2323
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingEnableIdMode, DeltaColumnMappingEnableNameMode, DeltaConfigs, DeltaExcludedBySparkVersionTestMixinShims, DeltaLog, DeltaUnsupportedOperationException}
24+
import org.apache.spark.sql.delta.clustering.ClusteringMetadataDomain
2425
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2526
import org.apache.spark.sql.delta.stats.SkippingEligibleDataType
2627
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
@@ -825,6 +826,45 @@ trait ClusteredTableDDLSuiteBase
825826
}
826827
}
827828

829+
test("validate RESTORE on clustered table") {
830+
val tableIdentifier = TableIdentifier(testTable)
831+
// Scenario 1: restore clustered table to unclustered version.
832+
withTable(testTable) {
833+
sql(s"CREATE TABLE $testTable (a INT, b STRING) USING delta")
834+
val (_, startingSnapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier)
835+
assert(!ClusteredTableUtils.isSupported(startingSnapshot.protocol))
836+
837+
sql(s"ALTER TABLE $testTable CLUSTER BY (a)")
838+
verifyClusteringColumns(tableIdentifier, "a")
839+
840+
sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0")
841+
val (_, currentSnapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier)
842+
verifyClusteringColumns(tableIdentifier, "")
843+
}
844+
845+
// Scenario 2: restore clustered table to previous clustering columns.
846+
withClusteredTable(testTable, "a INT, b STRING", "a") {
847+
verifyClusteringColumns(tableIdentifier, "a")
848+
849+
sql(s"ALTER TABLE $testTable CLUSTER BY (b)")
850+
verifyClusteringColumns(tableIdentifier, "b")
851+
852+
sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0")
853+
verifyClusteringColumns(tableIdentifier, "a")
854+
}
855+
856+
// Scenario 3: restore unclustered table to clustered version.
857+
withClusteredTable(testTable, "a int", "a") {
858+
verifyClusteringColumns(tableIdentifier, "a")
859+
860+
sql(s"ALTER TABLE $testTable CLUSTER BY NONE")
861+
verifyClusteringColumns(tableIdentifier, "")
862+
863+
sql(s"RESTORE TABLE $testTable TO VERSION AS OF 0")
864+
verifyClusteringColumns(tableIdentifier, "a")
865+
}
866+
}
867+
828868
testSparkMasterOnly("Variant is not supported") {
829869
val e = intercept[DeltaAnalysisException] {
830870
createOrReplaceClusteredTable("CREATE", testTable, "id long, v variant", "v")

0 commit comments

Comments
 (0)