diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 4bf6435ed7f..96a17b72b82 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -23,9 +23,9 @@ import scala.util.control.NonFatal import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec} import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand +import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics import org.apache.spark.sql.delta.managedcommit.ManagedCommitUtils import org.apache.spark.sql.delta.metering.DeltaLogging -import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} import org.apache.spark.sql.util.ScalaExtensions._ @@ -309,8 +309,9 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) * @return Return the number of files rewritten. */ private def rewriteFilesIfNeeded(): Long = { - val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot) - if (numFilesToRewrite == 0L) return 0L + if (!TypeWideningMetadata.containsTypeWideningMetadata(table.initialSnapshot.schema)) { + return 0L + } // Wrap `table` in a ResolvedTable that can be passed to DeltaReorgTableCommand. The catalog & // table ID won't be used by DeltaReorgTableCommand. @@ -323,8 +324,9 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) reorgTableSpec = DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) )(Nil) - reorg.run(table.spark) - numFilesToRewrite + val rows = reorg.run(table.spark) + val metrics = rows.head.getAs[OptimizeMetrics](1) + metrics.numFilesRemoved } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index c71de8ea0b5..e6e75e26532 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -96,33 +96,4 @@ object TypeWidening { ) } } - - /** - * Filter the given list of files to only keep files that were written before the latest type - * change, if any. These older files contain a column or field with a type that is different than - * in the current table schema and must be rewritten when dropping the type widening table feature - * to make the table readable by readers that don't support the feature. - */ - def filterFilesRequiringRewrite(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = - TypeWideningMetadata.getLatestTypeChangeVersion(snapshot.metadata.schema) match { - case Some(latestVersion) => - files.filter(_.defaultRowCommitVersion match { - case Some(version) => version < latestVersion - // Files written before the type widening table feature was added to the table don't - // have a defaultRowCommitVersion. That does mean they were written before the latest - // type change. - case None => true - }) - case None => - Seq.empty - } - - - /** - * Return the number of files that were written before the latest type change and that then - * contain a column or field with a type that is different from the current able schema. - */ - def numFilesRequiringRewrite(snapshot: Snapshot): Long = { - filterFilesRequiringRewrite(snapshot, snapshot.allFiles.collect()).size - } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala index f78ef60ae29..1547a9629ae 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.delta.commands -import org.apache.spark.sql.delta.{Snapshot, TypeWidening} +import org.apache.spark.sql.delta.{DeltaColumnMapping, Snapshot} import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.{Row, SparkSession} @@ -97,14 +97,15 @@ sealed trait DeltaReorgOperation { * Collects files that need to be processed by the reorg operation from the list of candidate * files. */ - def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] + def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] } /** * Reorg operation to purge files with soft deleted rows. */ class DeltaPurgeOperation extends DeltaReorgOperation { - override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = + override def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile]) + : Seq[AddFile] = files.filter { file => (file.deletionVector != null && file.numPhysicalRecords.isEmpty) || file.numDeletedRecords > 0L @@ -115,7 +116,8 @@ class DeltaPurgeOperation extends DeltaReorgOperation { * Reorg operation to upgrade the iceberg compatibility version of a table. */ class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorgOperation { - override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = { + override def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile]) + : Seq[AddFile] = { def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = { if (file.tags == null) return true val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0") @@ -129,7 +131,12 @@ class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorg * Internal reorg operation to rewrite files to conform to the current table schema when dropping * the type widening table feature. */ -class DeltaRewriteTypeWideningOperation extends DeltaReorgOperation { - override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = - TypeWidening.filterFilesRequiringRewrite(snapshot, files) +class DeltaRewriteTypeWideningOperation extends DeltaReorgOperation with ReorgTableHelper { + override def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile]) + : Seq[AddFile] = { + val physicalSchema = DeltaColumnMapping.renameColumns(snapshot.schema) + filterParquetFilesOnExecutors(spark, files, snapshot, ignoreCorruptFiles = false) { + schema => fileHasDifferentTypes(schema, physicalSchema) + } + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index d1cbcb62997..711e05eb9c2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -265,8 +265,10 @@ class OptimizeExecutor( val partitionSchema = txn.metadata.partitionSchema val filesToProcess = optimizeContext.reorg match { - case Some(reorgOperation) => reorgOperation.filterFilesToReorg(txn.snapshot, candidateFiles) - case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) + case Some(reorgOperation) => + reorgOperation.filterFilesToReorg(sparkSession, txn.snapshot, candidateFiles) + case None => + filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) } val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableHelper.scala new file mode 100644 index 00000000000..3635fec73d0 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableHelper.scala @@ -0,0 +1,111 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.commands + +import org.apache.spark.sql.delta.Snapshot +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.commands.VacuumCommand.generateCandidateFileMap +import org.apache.spark.sql.delta.schema.SchemaMergingUtils +import org.apache.spark.sql.delta.util.DeltaFileOperations +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetToSparkSchemaConverter} +import org.apache.spark.sql.types.{AtomicType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration + +trait ReorgTableHelper extends Serializable { + /** + * Determine whether `fileSchema` has any columns that has a type that differs from + * `tablePhysicalSchema`. + */ + protected def fileHasDifferentTypes( + fileSchema: StructType, + tablePhysicalSchema: StructType): Boolean = { + SchemaMergingUtils.transformColumns(fileSchema, tablePhysicalSchema) { + case (_, StructField(_, fileType: AtomicType, _, _), + Some(StructField(_, tableType: AtomicType, _, _)), _) if fileType != tableType => + return true + case (_, field, _, _) => field + } + false + } + + /** + * Apply a filter on the list of AddFile to only keep the files that have physical parquet schema + * that satisfies the given filter function. + * + * Note: Filtering happens on the executors: **any variable captured by `filterFileFn` must be + * Serializable** + */ + protected def filterParquetFilesOnExecutors( + spark: SparkSession, + files: Seq[AddFile], + snapshot: Snapshot, + ignoreCorruptFiles: Boolean)( + filterFileFn: StructType => Boolean): Seq[AddFile] = { + + val serializedConf = new SerializableConfiguration(snapshot.deltaLog.newDeltaHadoopConf()) + val assumeBinaryIsString = spark.sessionState.conf.isParquetBinaryAsString + val assumeInt96IsTimestamp = spark.sessionState.conf.isParquetINT96AsTimestamp + val dataPath = new Path(snapshot.deltaLog.dataPath.toString) + + import org.apache.spark.sql.delta.implicits._ + + files.toDF(spark).as[AddFile].mapPartitions { iter => + filterParquetFiles(iter.toList, dataPath, serializedConf.value, ignoreCorruptFiles, + assumeBinaryIsString, assumeInt96IsTimestamp)(filterFileFn).toIterator + }.collect() + } + + protected def filterParquetFiles( + files: Seq[AddFile], + dataPath: Path, + configuration: Configuration, + ignoreCorruptFiles: Boolean, + assumeBinaryIsString: Boolean, + assumeInt96IsTimestamp: Boolean)( + filterFileFn: StructType => Boolean): Seq[AddFile] = { + val nameToAddFileMap = generateCandidateFileMap(dataPath, files) + + val fileStatuses = nameToAddFileMap.map { case (absPath, addFile) => + new FileStatus( + /* length */ addFile.size, + /* isDir */ false, + /* blockReplication */ 0, + /* blockSize */ 1, + /* modificationTime */ addFile.modificationTime, + new Path(absPath) + ) + } + + val footers = DeltaFileOperations.readParquetFootersInParallel( + configuration, + fileStatuses.toList, + ignoreCorruptFiles) + + val converter = + new ParquetToSparkSchemaConverter(assumeBinaryIsString, assumeInt96IsTimestamp) + + val filesNeedToRewrite = footers.filter { footer => + val fileSchema = ParquetFileFormat.readSchemaFromFooter(footer, converter) + filterFileFn(fileSchema) + }.map(_.getFile.toString) + filesNeedToRewrite.map(absPath => nameToAddFileMap(absPath)) + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index e166564e828..518ade767c5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -16,6 +16,7 @@ package org.apache.spark.sql.delta +import java.io.{File, PrintWriter} import java.util.concurrent.TimeUnit import com.databricks.spark.util.Log4jUsageLogger @@ -1017,7 +1018,7 @@ trait DeltaTypeWideningStatsTests { * Tests covering adding and removing the type widening table feature. Dropping the table feature * also includes rewriting data files with the old type and removing type widening metadata. */ -trait DeltaTypeWideningTableFeatureTests { +trait DeltaTypeWideningTableFeatureTests extends DeltaTypeWideningTestCases { self: QueryTest with ParquetTest with RowTrackingTestUtils @@ -1351,6 +1352,21 @@ trait DeltaTypeWideningTableFeatureTests { } } + for { + testCase <- supportedTestCases + } + test(s"drop feature after type change ${testCase.fromType.sql} -> ${testCase.toType.sql}") { + append(testCase.initialValuesDF.repartition(2)) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE ${testCase.toType.sql}") + append(testCase.additionalValuesDF.repartition(3)) + dropTableFeature( + expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, + expectedNumFilesRewritten = 2, + expectedColumnTypes = Map("value" -> testCase.toType) + ) + checkAnswer(readDeltaTable(tempPath), testCase.expectedResult) + } + test("drop feature after a type change with schema evolution") { setupManualClock() sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") @@ -1453,7 +1469,7 @@ trait DeltaTypeWideningTableFeatureTests { dropTableFeature( expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE, - expectedNumFilesRewritten = 3, + expectedNumFilesRewritten = 2, expectedColumnTypes = Map("a" -> IntegerType) ) checkAnswer(readDeltaTable(tempPath), @@ -1498,6 +1514,26 @@ trait DeltaTypeWideningTableFeatureTests { ) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2))) } + + test("rewriting files fails if there are corrupted files") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE INT") + addSingleFile(Seq(2), IntegerType) + addSingleFile(Seq(3), IntegerType) + val filePath = deltaLog.update().allFiles.first().path + val pw = new PrintWriter(new File(tempPath, filePath)) + pw.write("corrupted") + pw.close() + + // Rewriting files when dropping type widening should ignore this config, if the corruption is + // transient it will leave files behind that some clients can't read. + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + val ex = intercept[SparkException] { + sql(s"ALTER TABLE delta.`$tempDir` DROP FEATURE '${TypeWideningTableFeature.name}'") + } + assert(ex.getMessage.contains("Cannot seek after EOF")) + } + } } /** Trait collecting tests covering type widening + column mapping. */