From 7d9e6f3008b4d7ec7b0aabcecec3f612e8911557 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Fri, 17 May 2024 17:19:03 +0200 Subject: [PATCH] More column mapping, CDF, RESTORE tests --- .../sql/delta/DeltaTypeWideningSuite.scala | 403 ++++++++++++------ 1 file changed, 278 insertions(+), 125 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index e463e359fd3..8a2ac35d2d6 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -52,10 +52,13 @@ class DeltaTypeWideningSuite with RowTrackingTestUtils with DeltaSQLCommandTest with DeltaTypeWideningTestMixin + with DeltaTypeWideningDropFeatureTestMixin with DeltaTypeWideningAlterTableTests with DeltaTypeWideningNestedFieldsTests + with DeltaTypeWideningCompatibilityTests with DeltaTypeWideningMetadataTests with DeltaTypeWideningTableFeatureTests + with DeltaTypeWideningColumnMappingTests with DeltaTypeWideningStatsTests with DeltaTypeWideningConstraintsTests with DeltaTypeWideningGeneratedColumnTests @@ -78,6 +81,17 @@ trait DeltaTypeWideningTestMixin extends SharedSparkSession with DeltaDMLTestUti sql(s"ALTER TABLE delta.`$tablePath` " + s"SET TBLPROPERTIES('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = '${enabled.toString}')") + /** Whether the test table supports the type widening table feature. */ + def isTypeWideningSupported: Boolean = { + TypeWidening.isSupported(deltaLog.update().protocol) + } + + /** Whether the type widening table property is enabled on the test table. */ + def isTypeWideningEnabled: Boolean = { + val snapshot = deltaLog.update() + TypeWidening.isEnabled(snapshot.protocol, snapshot.metadata) + } + /** Short-hand to create type widening metadata for struct fields. */ protected def typeWideningMetadata( version: Long, @@ -93,6 +107,93 @@ trait DeltaTypeWideningTestMixin extends SharedSparkSession with DeltaDMLTestUti append(values.toDF("a").select(col("a").cast(dataType)).repartition(1)) } +/** + * Mixin traits containing helpers to test dropping the type widening table feature. + */ +trait DeltaTypeWideningDropFeatureTestMixin + extends QueryTest + with SharedSparkSession + with DeltaDMLTestUtils { + + /** Expected outcome of dropping the type widening table feature. */ + object ExpectedOutcome extends Enumeration { + val SUCCESS, FAIL_CURRENT_VERSION_USES_FEATURE, FAIL_HISTORICAL_VERSION_USES_FEATURE = Value + } + + /** + * Helper method to drop the type widening table feature and check for an expected outcome. + * Validates in particular that the right number of files were rewritten and that the rewritten + * files all contain the expected type for specified columns. + */ + def dropTableFeature( + expectedOutcome: ExpectedOutcome.Value, + expectedNumFilesRewritten: Long, + expectedColumnTypes: Map[String, DataType]): Unit = { + val snapshot = deltaLog.update() + // Need to directly call ALTER TABLE command to pass our deltaLog with manual clock. + val dropFeature = AlterTableDropFeatureDeltaCommand( + DeltaTableV2(spark, deltaLog.dataPath), + TypeWideningTableFeature.name) + + expectedOutcome match { + case ExpectedOutcome.SUCCESS => + dropFeature.run(spark) + case ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE => + checkError( + exception = intercept[DeltaTableFeatureException] { dropFeature.run(spark) }, + errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD", + parameters = Map( + "feature" -> TypeWideningTableFeature.name, + "logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key, + "logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION + .fromMetaData(snapshot.metadata).toString, + "truncateHistoryLogRetentionPeriod" -> + DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION + .fromMetaData(snapshot.metadata).toString) + ) + case ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE => + checkError( + exception = intercept[DeltaTableFeatureException] { dropFeature.run(spark) }, + errorClass = "DELTA_FEATURE_DROP_HISTORICAL_VERSIONS_EXIST", + parameters = Map( + "feature" -> TypeWideningTableFeature.name, + "logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key, + "logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION + .fromMetaData(snapshot.metadata).toString, + "truncateHistoryLogRetentionPeriod" -> + DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION + .fromMetaData(snapshot.metadata).toString) + ) + } + + // Check the number of files rewritten and that all files now contain the right data type. + assert(getNumRemoveFilesSinceVersion(snapshot.version + 1) === expectedNumFilesRewritten, + s"Expected $expectedNumFilesRewritten file(s) to be rewritten but found " + + s"${getNumRemoveFilesSinceVersion(snapshot.version + 1)} rewritten file(s).") + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + expectedColumnTypes.foreach { case (colName, expectedType) => + withSQLConf("spark.databricks.delta.formatCheck.enabled" -> "false") { + deltaLog.update().filesForScan(Seq.empty, keepNumRecords = false).files.foreach { file => + val filePath = DeltaFileOperations.absolutePath(deltaLog.dataPath.toString, file.path) + val data = spark.read.parquet(filePath.toString) + val physicalColName = DeltaColumnMapping.getPhysicalName(snapshot.schema(colName)) + assert(data.schema(physicalColName).dataType === expectedType, + s"File with values ${data.collect().mkString(", ")} wasn't rewritten.") + } + } + } + } + + /** Get the number of remove actions committed since the given table version (included). */ + def getNumRemoveFilesSinceVersion(version: Long): Long = + deltaLog + .getChanges(startVersion = version) + .flatMap { case (_, actions) => actions } + .collect { case r: RemoveFile => r } + .size +} + /** * Trait collecting supported and unsupported type change test cases. */ @@ -543,29 +644,41 @@ trait DeltaTypeWideningCompatibilityTests { checkAnswer(readCDF(start = 3, end = 3), Seq(Row(3, "insert"), Row(4, "insert"))) } - test("change column type and rename it with column mapping") { - withSQLConf((DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey, IdMapping.name)) { - sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + test("reading CDF with a type change using read schema from before the change") { + withSQLConf((DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")) { + sql(s"CREATE TABLE delta.`$tempPath` (a smallint) USING DELTA") } - // Add some data and change type of column `a`. - append(Seq(1).toDF("a").select($"a".cast(ByteType))) - sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE smallint") - append(Seq(2).toDF("a").select($"a".cast(ShortType))) - assert(readDeltaTable(tempPath).schema("a").dataType === ShortType) - checkAnswer(sql(s"SELECT a FROM delta.`$tempPath`"), Seq(Row(1), Row(2))) - - // Rename column `a` to `b`, add more data. - sql(s"ALTER TABLE delta.`$tempPath` RENAME COLUMN a TO b") - assert(readDeltaTable(tempPath).schema("b").dataType === ShortType) - checkAnswer(sql(s"SELECT b FROM delta.`$tempPath`"), Seq(Row(1), Row(2))) - append(Seq(3).toDF("b").select($"b".cast(ShortType))) - checkAnswer(sql(s"SELECT b FROM delta.`$tempPath`"), Seq(Row(1), Row(2), Row(3))) - - // Change column type again, add more data. - sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN b TYPE int") - assert(readDeltaTable(tempPath).schema("b").dataType === IntegerType) - append(Seq(4).toDF("b")) - checkAnswer(sql(s"SELECT b FROM delta.`$tempPath`"), Seq(Row(1), Row(2), Row(3), Row(4))) + append(Seq(1, 2).toDF("a").select($"a".cast(ShortType))) + val readSchemaSnapshot = deltaLog.update() + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") + append(Seq(3, 4).toDF("a")) + + def readCDF(start: Long, end: Long): DataFrame = + CDCReader + .changesToBatchDF( + deltaLog, + start, + end, + spark, + readSchemaSnapshot = Some(readSchemaSnapshot) + ) + .drop(CDCReader.CDC_COMMIT_TIMESTAMP) + .drop(CDCReader.CDC_COMMIT_VERSION) + + checkAnswer(readCDF(start = 1, end = 1), Seq(Row(1, "insert"), Row(2, "insert"))) + checkErrorMatchPVals( + exception = intercept[DeltaUnsupportedOperationException] { + readCDF(start = 1, end = 3) + }, + errorClass = "DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_SCHEMA_CHANGE", + parameters = Map( + "start" -> "1", + "end" -> "3", + "readSchema" -> ".*", + "readVersion" -> "1", + "incompatibleVersion" -> "2" + ) + ) } test("time travel read before type change") { @@ -902,7 +1015,8 @@ trait DeltaTypeWideningTableFeatureTests { self: QueryTest with ParquetTest with RowTrackingTestUtils - with DeltaTypeWideningTestMixin => + with DeltaTypeWideningTestMixin + with DeltaTypeWideningDropFeatureTestMixin => import testImplicits._ @@ -916,78 +1030,6 @@ trait DeltaTypeWideningTableFeatureTests { deltaLog = DeltaLog.forTable(spark, new Path(tempPath), clock) } - def isTypeWideningSupported: Boolean = { - TypeWidening.isSupported(deltaLog.update().protocol) - } - - def isTypeWideningEnabled: Boolean = { - val snapshot = deltaLog.update() - TypeWidening.isEnabled(snapshot.protocol, snapshot.metadata) - } - - /** Expected outcome of dropping the type widening table feature. */ - object ExpectedOutcome extends Enumeration { - val SUCCESS, FAIL_CURRENT_VERSION_USES_FEATURE, FAIL_HISTORICAL_VERSION_USES_FEATURE = Value - } - - /** Helper method to drop the type widening table feature and check for an expected outcome. */ - def dropTableFeature( - expectedOutcome: ExpectedOutcome.Value, - expectedNumFilesRewritten: Long, - expectedTypeAfterRewrite: DataType): Unit = { - val snapshot = deltaLog.update() - // Need to directly call ALTER TABLE command to pass our deltaLog with manual clock. - val dropFeature = AlterTableDropFeatureDeltaCommand( - DeltaTableV2(spark, deltaLog.dataPath), - TypeWideningTableFeature.name) - - expectedOutcome match { - case ExpectedOutcome.SUCCESS => - dropFeature.run(spark) - case ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE => - checkError( - exception = intercept[DeltaTableFeatureException] { dropFeature.run(spark) }, - errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD", - parameters = Map( - "feature" -> TypeWideningTableFeature.name, - "logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key, - "logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION - .fromMetaData(snapshot.metadata).toString, - "truncateHistoryLogRetentionPeriod" -> - DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION - .fromMetaData(snapshot.metadata).toString) - ) - case ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE => - checkError( - exception = intercept[DeltaTableFeatureException] { dropFeature.run(spark) }, - errorClass = "DELTA_FEATURE_DROP_HISTORICAL_VERSIONS_EXIST", - parameters = Map( - "feature" -> TypeWideningTableFeature.name, - "logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key, - "logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION - .fromMetaData(snapshot.metadata).toString, - "truncateHistoryLogRetentionPeriod" -> - DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION - .fromMetaData(snapshot.metadata).toString) - ) - } - - // Check the number of files rewritten and that all files now contain the right data type. - assert(getNumRemoveFilesSinceVersion(snapshot.version + 1) === expectedNumFilesRewritten, - s"Expected $expectedNumFilesRewritten file(s) to be rewritten but found " + - s"${getNumRemoveFilesSinceVersion(snapshot.version + 1)} rewritten file(s).") - assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) - - withSQLConf("spark.databricks.delta.formatCheck.enabled" -> "false") { - deltaLog.update().filesForScan(Seq.empty, keepNumRecords = false).files.foreach { file => - val filePath = DeltaFileOperations.absolutePath(deltaLog.dataPath.toString, file.path) - val data = spark.read.parquet(filePath.toString) - assert(data.schema("a").dataType === expectedTypeAfterRewrite, - s"File with values ${data.collect().mkString(", ")} wasn't rewritten.") - } - } - } - /** * Use this after dropping the table feature to artificially move the current time to after * the table retention period. @@ -999,14 +1041,6 @@ trait DeltaTypeWideningTableFeatureTests { TimeUnit.MINUTES.toMillis(5)) } - /** Get the number of remove actions committed since the given table version (included). */ - def getNumRemoveFilesSinceVersion(version: Long): Long = - deltaLog - .getChanges(startVersion = version) - .flatMap { case (_, actions) => actions } - .collect { case r: RemoveFile => r } - .size - test("enable type widening at table creation then disable it") { sql(s"CREATE TABLE delta.`$tempPath` (a int) USING DELTA " + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true')") @@ -1108,7 +1142,7 @@ trait DeltaTypeWideningTableFeatureTests { dropTableFeature( expectedOutcome = ExpectedOutcome.SUCCESS, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = ByteType + expectedColumnTypes = Map("a" -> ByteType) ) checkAnswer(readDeltaTable(tempPath), Seq.empty) } @@ -1154,7 +1188,7 @@ trait DeltaTypeWideningTableFeatureTests { dropTableFeature( expectedOutcome = ExpectedOutcome.SUCCESS, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = ByteType + expectedColumnTypes = Map("a" -> ByteType) ) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) } @@ -1171,19 +1205,19 @@ trait DeltaTypeWideningTableFeatureTests { dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, expectedNumFilesRewritten = 1, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) advancePastRetentionPeriod() dropTableFeature( expectedOutcome = ExpectedOutcome.SUCCESS, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) } @@ -1200,19 +1234,19 @@ trait DeltaTypeWideningTableFeatureTests { dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) advancePastRetentionPeriod() dropTableFeature( expectedOutcome = ExpectedOutcome.SUCCESS, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) } @@ -1227,20 +1261,20 @@ trait DeltaTypeWideningTableFeatureTests { dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, expectedNumFilesRewritten = 1, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) advancePastRetentionPeriod() dropTableFeature( expectedOutcome = ExpectedOutcome.SUCCESS, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) } @@ -1257,20 +1291,20 @@ trait DeltaTypeWideningTableFeatureTests { expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, // The file was already rewritten in UPDATE. expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) advancePastRetentionPeriod() dropTableFeature( expectedOutcome = ExpectedOutcome.SUCCESS, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) checkAnswer(readDeltaTable(tempPath), Seq(Row(11), Row(12), Row(13))) } @@ -1289,20 +1323,20 @@ trait DeltaTypeWideningTableFeatureTests { expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, // One file was already rewritten in UPDATE, leaving 1 file to rewrite. expectedNumFilesRewritten = 1, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) advancePastRetentionPeriod() dropTableFeature( expectedOutcome = ExpectedOutcome.SUCCESS, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) checkAnswer( readDeltaTable(tempPath), @@ -1325,14 +1359,14 @@ trait DeltaTypeWideningTableFeatureTests { dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, expectedNumFilesRewritten = 1, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) advancePastRetentionPeriod() dropTableFeature( expectedOutcome = ExpectedOutcome.SUCCESS, expectedNumFilesRewritten = 0, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(1024))) } @@ -1385,7 +1419,7 @@ trait DeltaTypeWideningTableFeatureTests { dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, expectedNumFilesRewritten = 1, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) } @@ -1414,12 +1448,27 @@ trait DeltaTypeWideningTableFeatureTests { dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, expectedNumFilesRewritten = 3, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(200))) } } + test("RESTORE to before type change") { + addSingleFile(Seq(1), ShortType) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE INT") + sql(s"UPDATE delta.`$tempPath` SET a = ${Int.MinValue} WHERE a = 1") + + // RESTORE to version 0, before the type change was applied. + sql(s"RESTORE TABLE delta.`$tempPath` VERSION AS OF 0") + checkAnswer(readDeltaTable(tempPath), Seq(Row(1))) + dropTableFeature( + // There should be no files to rewrite but versions before RESTORE still use the feature. + expectedOutcome = ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE, + expectedNumFilesRewritten = 0, + expectedColumnTypes = Map("a" -> ShortType) + ) + } test("dropping feature after RESTORE correctly rewrite files with old type") { addSingleFile(Seq(1), ShortType) @@ -1439,12 +1488,100 @@ trait DeltaTypeWideningTableFeatureTests { expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, // Both files added before the type change must be rewritten. expectedNumFilesRewritten = 2, - expectedTypeAfterRewrite = IntegerType + expectedColumnTypes = Map("a" -> IntegerType) ) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2))) } } +/** Trait collecting tests covering type widening + column mapping. */ +trait DeltaTypeWideningColumnMappingTests { + self: QueryTest + with DeltaTypeWideningTestMixin + with DeltaTypeWideningDropFeatureTestMixin => + + import testImplicits._ + + for (mappingMode <- Seq(IdMapping.name, NameMapping.name)) { + test(s"change column type and rename it, mappingMode=$mappingMode") { + withSQLConf((DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey, mappingMode)) { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + } + // Add some data and change type of column `a`. + addSingleFile(Seq(1), ByteType) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE smallint") + addSingleFile(Seq(2), ShortType) + assert(readDeltaTable(tempPath).schema("a").dataType === ShortType) + checkAnswer(sql(s"SELECT a FROM delta.`$tempPath`"), Seq(Row(1), Row(2))) + + // Rename column `a` to `a (with reserved characters)`, add more data. + val newColumnName = "a (with reserved characters)" + sql(s"ALTER TABLE delta.`$tempPath` RENAME COLUMN a TO `$newColumnName`") + assert(readDeltaTable(tempPath).schema(newColumnName).dataType === ShortType) + checkAnswer( + sql(s"SELECT `$newColumnName` FROM delta.`$tempPath`"), Seq(Row(1), Row(2)) + ) + append(Seq(3).toDF(newColumnName).select(col(newColumnName).cast(ShortType))) + checkAnswer( + sql(s"SELECT `$newColumnName` FROM delta.`$tempPath`"), Seq(Row(1), Row(2), Row(3)) + ) + + // Change column type again, add more data. + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN `$newColumnName` TYPE int") + assert( + readDeltaTable(tempPath).schema(newColumnName).dataType === IntegerType) + append(Seq(4).toDF(newColumnName).select(col(newColumnName).cast(IntegerType))) + checkAnswer( + sql(s"SELECT `$newColumnName` FROM delta.`$tempPath`"), + Seq(Row(1), Row(2), Row(3), Row(4)) + ) + + dropTableFeature( + expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, + // All files except the last one should be rewritten. + expectedNumFilesRewritten = 3, + expectedColumnTypes = Map(newColumnName -> IntegerType) + ) + } + + test(s"dropped column shouldn't cause files to be rewritten, mappingMode=$mappingMode") { + withSQLConf((DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey, mappingMode)) { + sql(s"CREATE TABLE delta.`$tempPath` (a byte, b byte) USING DELTA") + } + sql(s"INSERT INTO delta.`$tempPath` VALUES (1, 1)") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN b TYPE int") + sql(s"INSERT INTO delta.`$tempPath` VALUES (2, 2)") + sql(s"ALTER TABLE delta.`$tempPath` DROP COLUMN b") + dropTableFeature( + expectedOutcome = ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE, + expectedNumFilesRewritten = 0, + expectedColumnTypes = Map("a" -> ByteType) + ) + } + + test(s"swap column names and change type, mappingMode=$mappingMode") { + withSQLConf((DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey, mappingMode)) { + sql(s"CREATE TABLE delta.`$tempPath` (a byte, b byte) USING DELTA") + } + sql(s"INSERT INTO delta.`$tempPath` VALUES (1, 1)") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN b TYPE int") + sql(s"INSERT INTO delta.`$tempPath` VALUES (2, 2)") + sql(s"ALTER TABLE delta.`$tempPath` RENAME COLUMN b TO c") + sql(s"ALTER TABLE delta.`$tempPath` RENAME COLUMN a TO b") + sql(s"ALTER TABLE delta.`$tempPath` RENAME COLUMN c TO a") + sql(s"INSERT INTO delta.`$tempPath` VALUES (3, 3)") + dropTableFeature( + expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, + expectedNumFilesRewritten = 1, + expectedColumnTypes = Map( + "a" -> IntegerType, + "b" -> ByteType + ) + ) + } + } +} + trait DeltaTypeWideningConstraintsTests { self: QueryTest with SharedSparkSession => @@ -1570,6 +1707,22 @@ trait DeltaTypeWideningConstraintsTests { } } } + + test("add constraint after type change then RESTORE") { + withTable("t") { + sql("CREATE TABLE t (a byte) USING DELTA") + sql("INSERT INTO t VALUES (2)") + sql("ALTER TABLE t CHANGE COLUMN a TYPE INT") + sql("INSERT INTO t VALUES (5)") + checkAnswer(sql("SELECT a, hash(a) FROM t"), Seq(Row(2, 1765031574), Row(5, 1023896466))) + sql("ALTER TABLE t ADD CONSTRAINT ck CHECK (hash(a) > 0)") + // Constraints are stored in the table metadata, RESTORE removes the constraint so the type + // change can't get in the way. + sql(s"RESTORE TABLE t VERSION AS OF 1") + sql("INSERT INTO t VALUES (1)") + checkAnswer(sql("SELECT a, hash(a) FROM t"), Seq(Row(2, 1765031574), Row(1, -559580957))) + } + } } trait DeltaTypeWideningGeneratedColumnTests extends GeneratedColumnTest {