Skip to content

Commit

Permalink
[Spark] Drop Type widening feature: read Parquet footers to collect f…
Browse files Browse the repository at this point in the history
…iles to rewrite (#3155)

## What changes were proposed in this pull request?
The initial approach to identify files that contain a type that differs
from the table schema and that must be rewritten before dropping the
type widening table feature is convoluted and turns out to be more
brittle than intended.

This change switches instead to directly reading the file schema from
the Parquet footer and rewriting all files that have a mismatching type.

### Additional Context
Files are identified using their default row commit version (a part of
the row tracking feature) and matched against type changes previously
applied to the table and recorded in the table metadata: any file
written before the latest type change should use a different type and
must be rewritten.

This requires multiple pieces of information to be accurately tracked:
- Default row commit versions must be correctly assigned to all files.
E.p. files that are copied over without modification must never be
assigned a new default row commit version. On the other hand, default
row commit versions are preserved across CLONE but these versions don't
match anything in the new cloned table.
- Type change history must be reliably recorded and preserved across
schema changes, e.g. column mapping.

Any bug will likely lead to files not being correctly rewritten before
removing the table feature, potentially leaving the table in an
unreadable state.



## How was this patch tested?
Tests added in previous PR to cover CLONE and RESTORE:
#3053
Tests added and updated in this PR to cover rewriting files with
different column types when removing the table feature.
  • Loading branch information
johanl-db authored Jun 5, 2024
1 parent 4b102d3 commit cd95799
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec}
import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand
import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
import org.apache.spark.sql.delta.managedcommit.ManagedCommitUtils
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.spark.sql.util.ScalaExtensions._

Expand Down Expand Up @@ -309,8 +309,9 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
* @return Return the number of files rewritten.
*/
private def rewriteFilesIfNeeded(): Long = {
val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot)
if (numFilesToRewrite == 0L) return 0L
if (!TypeWideningMetadata.containsTypeWideningMetadata(table.initialSnapshot.schema)) {
return 0L
}

// Wrap `table` in a ResolvedTable that can be passed to DeltaReorgTableCommand. The catalog &
// table ID won't be used by DeltaReorgTableCommand.
Expand All @@ -323,8 +324,9 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
reorgTableSpec = DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None)
)(Nil)

reorg.run(table.spark)
numFilesToRewrite
val rows = reorg.run(table.spark)
val metrics = rows.head.getAs[OptimizeMetrics](1)
metrics.numFilesRemoved
}

/**
Expand Down
29 changes: 0 additions & 29 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,4 @@ object TypeWidening {
)
}
}

/**
* Filter the given list of files to only keep files that were written before the latest type
* change, if any. These older files contain a column or field with a type that is different than
* in the current table schema and must be rewritten when dropping the type widening table feature
* to make the table readable by readers that don't support the feature.
*/
def filterFilesRequiringRewrite(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
TypeWideningMetadata.getLatestTypeChangeVersion(snapshot.metadata.schema) match {
case Some(latestVersion) =>
files.filter(_.defaultRowCommitVersion match {
case Some(version) => version < latestVersion
// Files written before the type widening table feature was added to the table don't
// have a defaultRowCommitVersion. That does mean they were written before the latest
// type change.
case None => true
})
case None =>
Seq.empty
}


/**
* Return the number of files that were written before the latest type change and that then
* contain a column or field with a type that is different from the current able schema.
*/
def numFilesRequiringRewrite(snapshot: Snapshot): Long = {
filterFilesRequiringRewrite(snapshot, snapshot.allFiles.collect()).size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta.{Snapshot, TypeWidening}
import org.apache.spark.sql.delta.{DeltaColumnMapping, Snapshot}
import org.apache.spark.sql.delta.actions.AddFile

import org.apache.spark.sql.{Row, SparkSession}
Expand Down Expand Up @@ -97,14 +97,15 @@ sealed trait DeltaReorgOperation {
* Collects files that need to be processed by the reorg operation from the list of candidate
* files.
*/
def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile]
def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile]
}

/**
* Reorg operation to purge files with soft deleted rows.
*/
class DeltaPurgeOperation extends DeltaReorgOperation {
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
override def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile])
: Seq[AddFile] =
files.filter { file =>
(file.deletionVector != null && file.numPhysicalRecords.isEmpty) ||
file.numDeletedRecords > 0L
Expand All @@ -115,7 +116,8 @@ class DeltaPurgeOperation extends DeltaReorgOperation {
* Reorg operation to upgrade the iceberg compatibility version of a table.
*/
class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorgOperation {
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = {
override def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile])
: Seq[AddFile] = {
def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = {
if (file.tags == null) return true
val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0")
Expand All @@ -129,7 +131,12 @@ class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorg
* Internal reorg operation to rewrite files to conform to the current table schema when dropping
* the type widening table feature.
*/
class DeltaRewriteTypeWideningOperation extends DeltaReorgOperation {
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
TypeWidening.filterFilesRequiringRewrite(snapshot, files)
class DeltaRewriteTypeWideningOperation extends DeltaReorgOperation with ReorgTableHelper {
override def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile])
: Seq[AddFile] = {
val physicalSchema = DeltaColumnMapping.renameColumns(snapshot.schema)
filterParquetFilesOnExecutors(spark, files, snapshot, ignoreCorruptFiles = false) {
schema => fileHasDifferentTypes(schema, physicalSchema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,10 @@ class OptimizeExecutor(
val partitionSchema = txn.metadata.partitionSchema

val filesToProcess = optimizeContext.reorg match {
case Some(reorgOperation) => reorgOperation.filterFilesToReorg(txn.snapshot, candidateFiles)
case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
case Some(reorgOperation) =>
reorgOperation.filterFilesToReorg(sparkSession, txn.snapshot, candidateFiles)
case None =>
filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
}
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.commands.VacuumCommand.generateCandidateFileMap
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetToSparkSchemaConverter}
import org.apache.spark.sql.types.{AtomicType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

trait ReorgTableHelper extends Serializable {
/**
* Determine whether `fileSchema` has any columns that has a type that differs from
* `tablePhysicalSchema`.
*/
protected def fileHasDifferentTypes(
fileSchema: StructType,
tablePhysicalSchema: StructType): Boolean = {
SchemaMergingUtils.transformColumns(fileSchema, tablePhysicalSchema) {
case (_, StructField(_, fileType: AtomicType, _, _),
Some(StructField(_, tableType: AtomicType, _, _)), _) if fileType != tableType =>
return true
case (_, field, _, _) => field
}
false
}

/**
* Apply a filter on the list of AddFile to only keep the files that have physical parquet schema
* that satisfies the given filter function.
*
* Note: Filtering happens on the executors: **any variable captured by `filterFileFn` must be
* Serializable**
*/
protected def filterParquetFilesOnExecutors(
spark: SparkSession,
files: Seq[AddFile],
snapshot: Snapshot,
ignoreCorruptFiles: Boolean)(
filterFileFn: StructType => Boolean): Seq[AddFile] = {

val serializedConf = new SerializableConfiguration(snapshot.deltaLog.newDeltaHadoopConf())
val assumeBinaryIsString = spark.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = spark.sessionState.conf.isParquetINT96AsTimestamp
val dataPath = new Path(snapshot.deltaLog.dataPath.toString)

import org.apache.spark.sql.delta.implicits._

files.toDF(spark).as[AddFile].mapPartitions { iter =>
filterParquetFiles(iter.toList, dataPath, serializedConf.value, ignoreCorruptFiles,
assumeBinaryIsString, assumeInt96IsTimestamp)(filterFileFn).toIterator
}.collect()
}

protected def filterParquetFiles(
files: Seq[AddFile],
dataPath: Path,
configuration: Configuration,
ignoreCorruptFiles: Boolean,
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean)(
filterFileFn: StructType => Boolean): Seq[AddFile] = {
val nameToAddFileMap = generateCandidateFileMap(dataPath, files)

val fileStatuses = nameToAddFileMap.map { case (absPath, addFile) =>
new FileStatus(
/* length */ addFile.size,
/* isDir */ false,
/* blockReplication */ 0,
/* blockSize */ 1,
/* modificationTime */ addFile.modificationTime,
new Path(absPath)
)
}

val footers = DeltaFileOperations.readParquetFootersInParallel(
configuration,
fileStatuses.toList,
ignoreCorruptFiles)

val converter =
new ParquetToSparkSchemaConverter(assumeBinaryIsString, assumeInt96IsTimestamp)

val filesNeedToRewrite = footers.filter { footer =>
val fileSchema = ParquetFileFormat.readSchemaFromFooter(footer, converter)
filterFileFn(fileSchema)
}.map(_.getFile.toString)
filesNeedToRewrite.map(absPath => nameToAddFileMap(absPath))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta

import java.io.{File, PrintWriter}
import java.util.concurrent.TimeUnit

import com.databricks.spark.util.Log4jUsageLogger
Expand Down Expand Up @@ -1017,7 +1018,7 @@ trait DeltaTypeWideningStatsTests {
* Tests covering adding and removing the type widening table feature. Dropping the table feature
* also includes rewriting data files with the old type and removing type widening metadata.
*/
trait DeltaTypeWideningTableFeatureTests {
trait DeltaTypeWideningTableFeatureTests extends DeltaTypeWideningTestCases {
self: QueryTest
with ParquetTest
with RowTrackingTestUtils
Expand Down Expand Up @@ -1351,6 +1352,21 @@ trait DeltaTypeWideningTableFeatureTests {
}
}

for {
testCase <- supportedTestCases
}
test(s"drop feature after type change ${testCase.fromType.sql} -> ${testCase.toType.sql}") {
append(testCase.initialValuesDF.repartition(2))
sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE ${testCase.toType.sql}")
append(testCase.additionalValuesDF.repartition(3))
dropTableFeature(
expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE,
expectedNumFilesRewritten = 2,
expectedColumnTypes = Map("value" -> testCase.toType)
)
checkAnswer(readDeltaTable(tempPath), testCase.expectedResult)
}

test("drop feature after a type change with schema evolution") {
setupManualClock()
sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA")
Expand Down Expand Up @@ -1453,7 +1469,7 @@ trait DeltaTypeWideningTableFeatureTests {

dropTableFeature(
expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE,
expectedNumFilesRewritten = 3,
expectedNumFilesRewritten = 2,
expectedColumnTypes = Map("a" -> IntegerType)
)
checkAnswer(readDeltaTable(tempPath),
Expand Down Expand Up @@ -1498,6 +1514,26 @@ trait DeltaTypeWideningTableFeatureTests {
)
checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2)))
}

test("rewriting files fails if there are corrupted files") {
sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA")
sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE INT")
addSingleFile(Seq(2), IntegerType)
addSingleFile(Seq(3), IntegerType)
val filePath = deltaLog.update().allFiles.first().path
val pw = new PrintWriter(new File(tempPath, filePath))
pw.write("corrupted")
pw.close()

// Rewriting files when dropping type widening should ignore this config, if the corruption is
// transient it will leave files behind that some clients can't read.
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
val ex = intercept[SparkException] {
sql(s"ALTER TABLE delta.`$tempDir` DROP FEATURE '${TypeWideningTableFeature.name}'")
}
assert(ex.getMessage.contains("Cannot seek after EOF"))
}
}
}

/** Trait collecting tests covering type widening + column mapping. */
Expand Down

0 comments on commit cd95799

Please # to comment.