From 9b50cd206004ae28105846eee9d910f39019ab8b Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 27 Jun 2023 14:39:13 -0700 Subject: [PATCH] Delta Universal Format (UniForm) allows you to read Delta tables with Iceberg clients. ## Description UniForm takes advantage of the fact that both Delta and Iceberg consist of Parquet data files and a metadata layer. UniForm automatically generates Iceberg metadata asynchronously, allowing Iceberg clients to read Delta tables as if they were Iceberg tables. You can expect negligible Delta write overhead when UniForm is enabled, as the Iceberg conversion and transaction occurs asynchronously after the Delta commit. A single copy of the data files provides access to both format clients. This PR adds the implementation for Universal Format (Iceberg) as well as the IcebergCompatV1 protocol validation. To create a table with UniForm: ```sql CREATE TABLE T(c1 INT) USING DELTA SET TBLPROPERTIES( 'delta.universalFormat.enabledFormats' = 'iceberg'); ``` To enable UniForm on an existing table ```sql ALTER TABLE T SET TBLPROPERTIES( 'delta.columnMapping.mode' = 'name', 'delta.universalFormat.enabledFormats' = 'iceberg'); ``` See the IcebergCompatV1 protocol specification PR here: https://github.com/delta-io/delta/pull/1869. New UT `iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala` as well as manual local publishing and integration testing with two spark shells, one loaded with Delta, the other with Iceberg. ## Does this PR introduce _any_ user-facing changes? Optional delta table property `delta.universalFormat.enabledFormats`. Closes delta-io/delta#1870 GitOrigin-RevId: 8a4723680b12bb112190ee1f94a5eae9c4904a83 --- build.sbt | 57 ++- .../IcebergConversionTransaction.scala | 389 ++++++++++++++++++ .../icebergShaded/IcebergConverter.scala | 368 +++++++++++++++++ .../icebergShaded/IcebergSchemaUtils.scala | 113 +++++ .../IcebergTransactionUtils.scala | 170 ++++++++ .../sql/delta/ConvertToIcebergSuite.scala | 199 +++++++++ icebergShaded/.gitignore | 2 + icebergShaded/generate_iceberg_jars.py | 164 ++++++++ ...-schema-evolution-with-correct-field.patch | 186 +++++++++ ...must-not-delete-any-delta-data-files.patch | 177 ++++++++ .../resources/error/delta-error-classes.json | 47 +++ .../apache/spark/sql/delta/DeltaConfig.scala | 36 ++ .../apache/spark/sql/delta/DeltaErrors.scala | 61 +++ .../sql/delta/DeltaFileProviderUtils.scala | 92 +++++ .../org/apache/spark/sql/delta/DeltaLog.scala | 1 + .../spark/sql/delta/IcebergCompatV1.scala | 215 ++++++++++ .../sql/delta/OptimisticTransaction.scala | 28 +- .../sql/delta/ProvidesUniFormConverters.scala | 52 +++ .../apache/spark/sql/delta/TableFeature.scala | 13 + .../spark/sql/delta/UniversalFormat.scala | 168 ++++++++ .../commands/CreateDeltaTableCommand.scala | 10 + .../delta/hooks/IcebergConverterHook.scala | 51 +++ .../sql/delta/sources/DeltaSQLConf.scala | 18 + .../sql/delta/util/DeltaThreadPool.scala | 13 + 24 files changed, 2622 insertions(+), 8 deletions(-) create mode 100644 iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala create mode 100644 iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala create mode 100644 iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergSchemaUtils.scala create mode 100644 iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala create mode 100644 iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala create mode 100644 icebergShaded/.gitignore create mode 100644 icebergShaded/generate_iceberg_jars.py create mode 100644 icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch create mode 100644 icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/DeltaFileProviderUtils.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompatV1.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/hooks/IcebergConverterHook.scala diff --git a/build.sbt b/build.sbt index 35943c3321c..c96dbc741e5 100644 --- a/build.sbt +++ b/build.sbt @@ -272,6 +272,13 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb")) ) ) +val icebergSparkRuntimeArtifactName = { + val (expMaj, expMin, _) = getMajorMinorPatch(sparkVersion) + s"iceberg-spark-runtime-$expMaj.$expMin" +} + +// Build using: build/sbt clean icebergShaded/compile iceberg/compile +// It will fail the first time, just re-run it. lazy val iceberg = (project in file("iceberg")) .dependsOn(spark % "compile->compile;test->test;provided->provided") .settings ( @@ -279,17 +286,53 @@ lazy val iceberg = (project in file("iceberg")) commonSettings, scalaStyleSettings, releaseSettings, - libraryDependencies ++= Seq( { - val (expMaj, expMin, _) = getMajorMinorPatch(sparkVersion) - ("org.apache.iceberg" % s"iceberg-spark-runtime-$expMaj.$expMin" % "1.3.0" % "provided") - .cross(CrossVersion.binary) - }, + libraryDependencies ++= Seq( // Fix Iceberg's legacy java.lang.NoClassDefFoundError: scala/jdk/CollectionConverters$ error // due to legacy scala. - "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1" - ) + "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1", + "org.apache.iceberg" %% icebergSparkRuntimeArtifactName % "1.3.0" % "provided", + "com.github.ben-manes.caffeine" % "caffeine" % "2.9.3" + ), + Compile / unmanagedJars += (icebergShaded / assembly).value, + // Generate the assembly JAR as the package JAR + Compile / packageBin := assembly.value, + assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar", + assembly / logLevel := Level.Info, + assembly / test := {}, + assemblyPackageScala / assembleArtifact := false ) +lazy val generateIcebergJarsTask = TaskKey[Unit]("generateIcebergJars", "Generate Iceberg JARs") + +lazy val icebergShaded = (project in file("icebergShaded")) + .dependsOn(spark % "provided") + .settings ( + name := "iceberg-shaded", + commonSettings, + skipReleaseSettings, + + // Compile, patch and generated Iceberg JARs + generateIcebergJarsTask := { + import sys.process._ + val scriptPath = baseDirectory.value / "generate_iceberg_jars.py" + // Download iceberg code in `iceberg_src` dir and generate the JARs in `lib` dir + Seq("python3", scriptPath.getPath)! + }, + Compile / unmanagedJars := (Compile / unmanagedJars).dependsOn(generateIcebergJarsTask).value, + cleanFiles += baseDirectory.value / "iceberg_src", + cleanFiles += baseDirectory.value / "lib", + + // Generated shaded Iceberg JARs + Compile / packageBin := assembly.value, + assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar", + assembly / logLevel := Level.Info, + assembly / test := {}, + assembly / assemblyShadeRules := Seq( + ShadeRule.rename("org.apache.iceberg.**" -> "shadedForDelta.@0").inAll, + ), + assemblyPackageScala / assembleArtifact := false, + // Make the 'compile' invoke the 'assembly' task to generate the uber jar. + ) lazy val hive = (project in file("connectors/hive")) .dependsOn(standaloneCosmetic) diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala new file mode 100644 index 00000000000..e5efc1c128d --- /dev/null +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -0,0 +1,389 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.icebergShaded + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal + +import org.apache.spark.sql.delta.{DeltaFileProviderUtils, Snapshot} +import org.apache.spark.sql.delta.actions.{AddFile, Metadata, RemoveFile} +import org.apache.spark.sql.delta.icebergShaded.IcebergSchemaUtils._ +import org.apache.spark.sql.delta.icebergShaded.IcebergTransactionUtils._ +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.hadoop.conf.Configuration +import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction} +import shadedForDelta.org.apache.iceberg.hadoop.HadoopTables + +sealed trait IcebergTableOp +case object CREATE_TABLE extends IcebergTableOp +case object WRITE_TABLE extends IcebergTableOp +case object REPLACE_TABLE extends IcebergTableOp + +/** + * Used to prepare (convert) and then commit a set of Delta actions into the Iceberg table located + * at the same path as [[postCommitSnapshot]] + * + * + * @param conf Configuration for Iceberg Hadoop interactions. + * @param postCommitSnapshot Latest Delta snapshot associated with this Iceberg commit. + * @param tableOp How to instantiate the underlying Iceberg table. Defaults to WRITE_TABLE. + */ +class IcebergConversionTransaction( + protected val conf: Configuration, + protected val postCommitSnapshot: Snapshot, + protected val tableOp: IcebergTableOp = WRITE_TABLE, + protected val lastConvertedDeltaVersion: Option[Long] = None) extends DeltaLogging { + + /////////////////////////// + // Nested Helper Classes // + /////////////////////////// + + protected abstract class TransactionHelper(impl: PendingUpdate[_]) { + private var committed = false + + def opType: String + + def commit(): Unit = { + assert(!committed, "Already committed.") + impl.commit() + committed = true + } + + private[icebergShaded]def hasCommitted: Boolean = committed + } + + /** + * API for appending new files in a table. + * + * e.g. INSERT + */ + class AppendOnlyHelper(appender: AppendFiles) extends TransactionHelper(appender) { + + override def opType: String = "append" + + def add(add: AddFile): Unit = { + appender.appendFile( + convertDeltaAddFileToIcebergDataFile( + add, + tablePath, + partitionSpec, + logicalToPhysicalPartitionNames, + postCommitSnapshot.statsSchema, + statsParser, + postCommitSnapshot.deltaLog + ) + ) + } + } + + /** + * API for deleting files from a table. + * + * e.g. DELETE + */ + class RemoveOnlyHelper(deleter: DeleteFiles) extends TransactionHelper(deleter) { + + override def opType: String = "delete" + + def remove(remove: RemoveFile): Unit = { + // We can just use the canonical RemoveFile.path instead of converting RemoveFile to DataFile. + // Note that in other helper APIs, converting a FileAction to a DataFile will also take care + // of canonicalizing the path. + deleter.deleteFile(canonicalizeFilePath(remove, tablePath)) + } + } + + /** + * API for overwriting files in a table. Replaces all the deleted files with the set of additions. + * + * e.g. UPDATE, MERGE + */ + class OverwriteHelper(overwriter: OverwriteFiles) extends TransactionHelper(overwriter) { + + override def opType: String = "overwrite" + + def add(add: AddFile): Unit = { + overwriter.addFile( + convertDeltaAddFileToIcebergDataFile( + add, + tablePath, + partitionSpec, + logicalToPhysicalPartitionNames, + postCommitSnapshot.statsSchema, + statsParser, + postCommitSnapshot.deltaLog + ) + ) + } + + def remove(remove: RemoveFile): Unit = { + overwriter.deleteFile( + convertDeltaRemoveFileToIcebergDataFile( + remove, tablePath, partitionSpec, logicalToPhysicalPartitionNames) + ) + } + } + + /** + * API for rewriting existing files in the table (i.e. replaces one set of data files with another + * set that contains the same data). + * + * e.g. OPTIMIZE + */ + class RewriteHelper(rewriter: RewriteFiles) extends TransactionHelper(rewriter) { + + override def opType: String = "rewrite" + + def rewrite(removes: Seq[RemoveFile], adds: Seq[AddFile]): Unit = { + val dataFilesToDelete = removes.map { f => + assert(!f.dataChange, "Rewrite operation should not add data") + convertDeltaRemoveFileToIcebergDataFile( + f, tablePath, partitionSpec, logicalToPhysicalPartitionNames) + }.toSet.asJava + + val dataFilesToAdd = adds.map { f => + assert(!f.dataChange, "Rewrite operation should not add data") + convertDeltaAddFileToIcebergDataFile( + f, + tablePath, + partitionSpec, + logicalToPhysicalPartitionNames, + postCommitSnapshot.statsSchema, + statsParser, + postCommitSnapshot.deltaLog + ) + }.toSet.asJava + + rewriter.rewriteFiles(dataFilesToDelete, dataFilesToAdd, 0) + } + } + + ////////////////////// + // Member variables // + ////////////////////// + + protected val tablePath = postCommitSnapshot.deltaLog.dataPath + protected val icebergSchema = + convertDeltaSchemaToIcebergSchema(postCommitSnapshot.metadata.schema) + protected val partitionSpec = + createPartitionSpec(icebergSchema, postCommitSnapshot.metadata.partitionColumns) + private val logicalToPhysicalPartitionNames = + getPartitionPhysicalNameMapping(postCommitSnapshot.metadata.partitionSchema) + + /** Parses the stats JSON string to convert Delta stats to Iceberg stats. */ + private val statsParser = + DeltaFileProviderUtils.createJsonStatsParser(postCommitSnapshot.statsSchema) + + /** Visible for testing. */ + private[icebergShaded]val txn = createIcebergTxn() + + /** Tracks if this transaction has already committed. You can only commit once. */ + private var committed = false + + /** Tracks the file updates (add, remove, overwrite, rewrite) made to this table. */ + private val fileUpdates = new ArrayBuffer[TransactionHelper]() + + /** Tracks if this transaction updates only the differences between a prev and new metadata. */ + private var isMetadataUpdate = false + + ///////////////// + // Public APIs // + ///////////////// + + def getAppendOnlyHelper(): AppendOnlyHelper = { + val ret = new AppendOnlyHelper(txn.newAppend()) + fileUpdates += ret + ret + } + + def getRemoveOnlyHelper(): RemoveOnlyHelper = { + val ret = new RemoveOnlyHelper(txn.newDelete()) + fileUpdates += ret + ret + } + + def getOverwriteHelper(): OverwriteHelper = { + val ret = new OverwriteHelper(txn.newOverwrite()) + fileUpdates += ret + ret + } + + def getRewriteHelper(): RewriteHelper = { + val ret = new RewriteHelper(txn.newRewrite()) + fileUpdates += ret + ret + } + + /** + * Handles the following update scenarios + * - partition update -> throws + * - schema update -> sets the full new schema + * - properties update -> applies only the new properties + */ + def updateTableMetadata(newMetadata: Metadata, prevMetadata: Metadata): Unit = { + assert(!isMetadataUpdate, "updateTableMetadata already called") + isMetadataUpdate = true + + // Throws if partition evolution detected + if (newMetadata.partitionColumns != prevMetadata.partitionColumns) { + throw new IllegalStateException("Delta does not support partition evolution") + } + + if (newMetadata.schema != prevMetadata.schema) { + val differenceStr = SchemaUtils.reportDifferences(prevMetadata.schema, newMetadata.schema) + logInfo(s"Detected Delta schema update for table with name=${newMetadata.name}, " + + s"id=${newMetadata.id}:\n$differenceStr") + + txn.setSchema(icebergSchema).commit() + + recordDeltaEvent( + postCommitSnapshot.deltaLog, + "delta.iceberg.conversion.schemaChange", + data = Map( + "version" -> postCommitSnapshot.version, + "deltaSchemaDiff" -> differenceStr, + "icebergSchema" -> icebergSchema.toString.replace('\n', ';') + ) + ) + } + + val (propertyDeletes, propertyAdditions) = + detectPropertiesChange(newMetadata.configuration, prevMetadata.configuration) + + if (propertyDeletes.nonEmpty || propertyAdditions.nonEmpty) { + val updater = txn.updateProperties() + propertyDeletes.foreach(updater.remove) + propertyAdditions.foreach(kv => updater.set(kv._1, kv._2)) + updater.commit() + + recordDeltaEvent( + postCommitSnapshot.deltaLog, + "delta.iceberg.conversion.propertyChange", + data = Map("version" -> postCommitSnapshot.version) ++ + (if (propertyDeletes.nonEmpty) Map("deletes" -> propertyDeletes.toSeq) else Map.empty) ++ + (if (propertyAdditions.nonEmpty) Map("adds" -> propertyAdditions) else Map.empty) + ) + } + } + + def commit(): Unit = { + assert(!committed, "Cannot commit. Transaction already committed.") + + // At least one file or metadata updates is required when writing to an existing table. If + // creating or replacing a table, we can create an empty table with just the table metadata + // (schema, properties, etc.) + if (tableOp == WRITE_TABLE) { + assert(fileUpdates.nonEmpty || isMetadataUpdate, "Cannot commit WRITE. Transaction is empty.") + } + assert(fileUpdates.forall(_.hasCommitted), "Cannot commit. You have uncommitted changes.") + + txn.updateProperties() + .set(IcebergConverter.DELTA_VERSION_PROPERTY, postCommitSnapshot.version.toString) + .set(IcebergConverter.DELTA_TIMESTAMP_PROPERTY, postCommitSnapshot.timestamp.toString) + .commit() + + try { + txn.commitTransaction() + recordIcebergCommit() + } catch { + case NonFatal(e) => + recordIcebergCommit(Some(e)) + throw e + } + + committed = true + } + + /////////////////////// + // Protected Methods // + /////////////////////// + + protected def createIcebergTxn(): IcebergTransaction = { + val hadoopTables = new HadoopTables(conf) + val tableExists = hadoopTables.exists(tablePath.toString) + + def tableBuilder = { + val properties = getIcebergPropertiesFromDeltaProperties( + postCommitSnapshot.metadata.configuration + ) + + hadoopTables + .buildTable(tablePath.toString, icebergSchema) + .withPartitionSpec(partitionSpec) + .withProperties(properties.asJava) + } + + tableOp match { + case WRITE_TABLE => + if (tableExists) { + recordFrameProfile("IcebergConversionTransaction", "loadTable") { + hadoopTables.load(tablePath.toString).newTransaction() + } + } else { + throw new IllegalStateException(s"Cannot write to table $tablePath. Table doesn't exist.") + } + case CREATE_TABLE => + if (tableExists) { + throw new IllegalStateException(s"Cannot create table $tablePath. Table already exists.") + } else { + recordFrameProfile("IcebergConversionTransaction", "createTable") { + tableBuilder.createTransaction() + } + } + case REPLACE_TABLE => + if (tableExists) { + recordFrameProfile("IcebergConversionTransaction", "replaceTable") { + tableBuilder.replaceTransaction() + } + } else { + throw new IllegalStateException(s"Cannot replace table $tablePath. Table doesn't exist.") + } + } + } + + //////////////////// + // Helper Methods // + //////////////////// + + private def recordIcebergCommit(errorOpt: Option[Throwable] = None): Unit = { + val icebergTxnTypes = + if (fileUpdates.nonEmpty) Map("icebergTxnTypes" -> fileUpdates.map(_.opType)) else Map.empty + + val errorData = errorOpt.map { e => + Map( + "exception" -> ExceptionUtils.getMessage(e), + "stackTrace" -> ExceptionUtils.getStackTrace(e) + ) + }.getOrElse(Map.empty) + + + recordDeltaEvent( + postCommitSnapshot.deltaLog, + s"delta.iceberg.conversion.commit.${if (errorOpt.isEmpty) "success" else "error"}", + data = Map( + "version" -> postCommitSnapshot.version, + "timestamp" -> postCommitSnapshot.timestamp, + "tableOp" -> tableOp.getClass.getSimpleName.stripSuffix("$"), + "prevConvertedDeltaVersion" -> lastConvertedDeltaVersion + ) ++ icebergTxnTypes ++ errorData + ) + } + +} diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala new file mode 100644 index 00000000000..843f584e27f --- /dev/null +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala @@ -0,0 +1,368 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.icebergShaded + +import java.util.concurrent.atomic.AtomicReference +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.util.control.Breaks._ +import scala.util.control.NonFatal + +import org.apache.spark.sql.delta.{DeltaFileNotFoundException, DeltaFileProviderUtils, OptimisticTransactionImpl, Snapshot, UniversalFormatConverter} +import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, RemoveFile} +import org.apache.spark.sql.delta.hooks.IcebergConverterHook +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.hadoop.fs.Path +import shadedForDelta.org.apache.iceberg.hadoop.HadoopTables + +import org.apache.spark.sql.SparkSession + +object IcebergConverter { + + + /** + * Property to be set in translated Iceberg metadata files. + * Indicates the delta commit version # that it corresponds to. + */ + val DELTA_VERSION_PROPERTY = "delta-version" + + /** + * Property to be set in translated Iceberg metadata files. + * Indicates the timestamp (milliseconds) of the delta commit that it corresponds to. + */ + val DELTA_TIMESTAMP_PROPERTY = "delta-timestamp" +} + +/** + * This class manages the transformation of delta snapshots into their Iceberg equivalent. + */ +class IcebergConverter(spark: SparkSession) + extends UniversalFormatConverter(spark) + with DeltaLogging { + + // Save an atomic reference of the snapshot being converted, and the txn that triggered + // resulted in the specified snapshot + protected val currentConversion = + new AtomicReference[(Snapshot, Option[OptimisticTransactionImpl])]() + protected val standbyConversion = + new AtomicReference[(Snapshot, Option[OptimisticTransactionImpl])]() + + // Whether our async converter thread is active. We may already have an alive thread that is + // about to shutdown, but in such cases this value should return false. + @GuardedBy("asyncThreadLock") + private var asyncConverterThreadActive: Boolean = false + private val asyncThreadLock = new Object + + /** + * Enqueue the specified snapshot to be converted to Iceberg. This will start an async + * job to run the conversion, unless there already is an async conversion running for + * this table. In that case, it will queue up the provided snapshot to be run after + * the existing job completes. + * Note that if there is another snapshot already queued, the previous snapshot will get + * removed from the wait queue. Only one snapshot is queued at any point of time. + * + */ + override def enqueueSnapshotForConversion( + snapshotToConvert: Snapshot, + txn: Option[OptimisticTransactionImpl]): Unit = { + val log = snapshotToConvert.deltaLog + // Replace any previously queued snapshot + val previouslyQueued = standbyConversion.getAndSet((snapshotToConvert, txn)) + asyncThreadLock.synchronized { + if (!asyncConverterThreadActive) { + val threadName = IcebergConverterHook.ASYNC_ICEBERG_CONVERTER_THREAD_NAME + + s" [id=${snapshotToConvert.metadata.id}]" + val asyncConverterThread: Thread = new Thread(threadName) { + setDaemon(true) + + override def run(): Unit = + try { + var snapshotAndTxn = getNextSnapshot + while (snapshotAndTxn != null) { + val snapshotVal = snapshotAndTxn._1 + val prevTxn = snapshotAndTxn._2 + try { + logInfo(s"Converting Delta table [path=${log.logPath}, " + + s"tableId=${log.tableId}, version=${snapshotVal.version}] into Iceberg") + convertSnapshot(snapshotVal, prevTxn) + } catch { + case NonFatal(e) => + logWarning(s"Error when writing Iceberg metadata asynchronously", e) + recordDeltaEvent( + log, + "delta.iceberg.conversion.async.error", + data = Map( + "exception" -> ExceptionUtils.getMessage(e), + "stackTrace" -> ExceptionUtils.getStackTrace(e) + ) + ) + } + currentConversion.set(null) + // Pick next snapshot to convert if there's a new one + snapshotAndTxn = getNextSnapshot + } + } finally { + // shuttingdown thread + asyncThreadLock.synchronized { + asyncConverterThreadActive = false + } + } + + // Get a snapshot to convert from the icebergQueue. Sets the queue to null after. + private def getNextSnapshot: (Snapshot, Option[OptimisticTransactionImpl]) = + asyncThreadLock.synchronized { + val potentialSnapshotAndTxn = standbyConversion.get() + currentConversion.set(potentialSnapshotAndTxn) + standbyConversion.compareAndSet(potentialSnapshotAndTxn, null) + if (potentialSnapshotAndTxn == null) { + asyncConverterThreadActive = false + } + potentialSnapshotAndTxn + } + } + asyncConverterThread.start() + asyncConverterThreadActive = true + } + } + + // If there already was a snapshot waiting to be converted, log that snapshot info. + if (previouslyQueued != null) { +// previouslyQueued._1.uncache() + recordDeltaEvent( + snapshotToConvert.deltaLog, + "delta.iceberg.conversion.async.backlog", + data = Map( + "newVersion" -> snapshotToConvert.version, + "replacedVersion" -> previouslyQueued._1.version) + ) + } + } + + /** + * Convert the specified snapshot into Iceberg. NOTE: This operation is blocking. Call + * enqueueSnapshotForConversion to run the operation asynchronously. + * @param snapshotToConvert the snapshot that needs to be converted to Iceberg + * @param txnOpt the OptimisticTransaction that created snapshotToConvert. + * Used as a hint to avoid recomputing old metadata. + * @return Converted Delta version and commit timestamp + */ + override def convertSnapshot( + snapshotToConvert: Snapshot, + txnOpt: Option[OptimisticTransactionImpl]): Option[(Long, Long)] = + recordFrameProfile("Delta", "IcebergConverter.convertSnapshot") { + val log = snapshotToConvert.deltaLog + val lastDeltaVersionConverted: Option[Long] = + loadLastDeltaVersionConverted(snapshotToConvert) + val maxCommitsToConvert = + spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_COMMITS_TO_CONVERT) + + // Nth to convert + if (lastDeltaVersionConverted.exists(_ == snapshotToConvert.version)) { + return None + } + + // Get the most recently converted delta snapshot, if applicable + val prevConvertedSnapshotOpt = (lastDeltaVersionConverted, txnOpt) match { + case (Some(version), Some(txn)) if version == txn.snapshot.version => + Some(txn.snapshot) + // Check how long it has been since we last converted to Iceberg. If outside the threshold, + // fall back to state reconstruction to get the actions, to protect driver from OOMing. + case (Some(version), _) if snapshotToConvert.version - version <= maxCommitsToConvert => + try { + // TODO: We can optimize this by providing a checkpointHint to getSnapshotAt. Check if + // txn.snapshot.version < version. If true, use txn.snapshot's checkpoint as a hint. + Some(log.getSnapshotAt(version)) + } catch { + // If we can't load the file since the last time Iceberg was converted, it's likely that + // the commit file expired. Treat this like a new Iceberg table conversion. + case _: DeltaFileNotFoundException => None + } + case (_, _) => None + } + + val tableOp = (lastDeltaVersionConverted, prevConvertedSnapshotOpt) match { + case (Some(_), Some(_)) => WRITE_TABLE + case (Some(_), None) => REPLACE_TABLE + case (None, None) => CREATE_TABLE + } + val icebergTxn = new IcebergConversionTransaction( + log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastDeltaVersionConverted) + + // Write out the actions taken since the last conversion (or since table creation). + // This is done in batches, with each batch corresponding either to one delta file, + // or to the specified batch size. + val actionBatchSize = + spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_ACTIONS_TO_CONVERT) + prevConvertedSnapshotOpt match { + case Some(prevSnapshot) => + // Read the actions directly from the delta json files. + // TODO: Run this as a spark job on executors + val deltaFiles = DeltaFileProviderUtils.getDeltaFilesInVersionRange( + spark, log, prevSnapshot.version + 1, snapshotToConvert.version) + + recordDeltaEvent( + snapshotToConvert.deltaLog, + "delta.iceberg.conversion.deltaCommitRange", + data = Map( + "fromVersion" -> (prevSnapshot.version + 1), + "toVersion" -> snapshotToConvert.version, + "numDeltaFiles" -> deltaFiles.length + ) + ) + + val actionsToConvert = DeltaFileProviderUtils.parallelReadAndParseDeltaFilesAsIterator( + log, spark, deltaFiles) + actionsToConvert.foreach { actionsIter => + try { + actionsIter.grouped(actionBatchSize).foreach { actionStrs => + runIcebergConversionForActions( + icebergTxn, + actionStrs.map(Action.fromJson), + log.dataPath, + prevConvertedSnapshotOpt) + } + } finally { + actionsIter.close() + } + } + // If the metadata hasn't changed, this will no-op. + icebergTxn.updateTableMetadata(snapshotToConvert.metadata, prevSnapshot.metadata) + + // If we don't have a snapshot of the last converted version, get all the table addFiles + // (via state reconstruction). + case None => + val actionsToConvert = snapshotToConvert.allFiles.toLocalIterator().asScala + + recordDeltaEvent( + snapshotToConvert.deltaLog, + "delta.iceberg.conversion.batch", + data = Map( + "version" -> snapshotToConvert.version, + "numDeltaFiles" -> snapshotToConvert.numOfFiles + ) + ) + + actionsToConvert.grouped(actionBatchSize) + .foreach { actions => + runIcebergConversionForActions(icebergTxn, actions, log.dataPath, None) + } + } + icebergTxn.commit() + Some(snapshotToConvert.version, snapshotToConvert.timestamp) + } + + override def loadLastDeltaVersionConverted(snapshot: Snapshot): Option[Long] = + recordFrameProfile("Delta", "IcebergConverter.loadLastDeltaVersionConverted") { + val deltaLog = snapshot.deltaLog + val hadoopTables = new HadoopTables(deltaLog.newDeltaHadoopConf()) + if (hadoopTables.exists(deltaLog.dataPath.toString)) { + hadoopTables + .load(deltaLog.dataPath.toString) + .properties() + .asScala + .get(IcebergConverter.DELTA_VERSION_PROPERTY) + .map(_.toLong) + } else None + } + + /** + * Build an iceberg TransactionHelper from the provided txn, and commit the set of changes + * specified by the actionsToCommit. + */ + private[delta] def runIcebergConversionForActions( + icebergTxn: IcebergConversionTransaction, + actionsToCommit: Seq[Action], + dataPath: Path, + prevSnapshotOpt: Option[Snapshot]): Unit = { + prevSnapshotOpt match { + case None => + // If we don't have a previous snapshot, that implies that the table is either being + // created or replaced. We can assume that the actions have already been deduped, and + // only addFiles are present. + val appendHelper = icebergTxn.getAppendOnlyHelper() + actionsToCommit.foreach { + case a: AddFile => appendHelper.add(a) + case _ => throw new IllegalStateException(s"Must provide only AddFiles when creating " + + s"or replacing an Iceberg Table $dataPath.") + } + appendHelper.commit() + + case Some(_) => + // We have to go through the seq of actions twice, once to figure out the TransactionHelper + // to use, and then again to commit the actions. This is not too expensive, since the max # + // of actions is <= min(max # actions in delta json, ICEBERG_MAX_ACTIONS_TO_CONVERT) + var hasAdds = false + var hasRemoves = false + var hasDataChange = false + var hasCommitInfo = false + breakable { + for (action <- actionsToCommit) { + action match { + case a: AddFile => + hasAdds = true + if (a.dataChange) hasDataChange = true + case r: RemoveFile => + hasRemoves = true + if (r.dataChange) hasDataChange = true + case _: CommitInfo => hasCommitInfo = true + case _ => // Do nothing + } + if (hasAdds && hasRemoves && hasDataChange && hasCommitInfo) break // Short-circuit + } + } + + // We want to know whether all actions in the commit are contained in this `actionsToCommit` + // group. If yes, then we can safely determine whether the operation is a rewrite, delete, + // append, overwrite, etc. If not, then we can't make any assumptions since we have + // incomplete information, and we default to a rewrite. + val allDeltaActionsCaptured = hasCommitInfo && actionsToCommit.size < + spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_ACTIONS_TO_CONVERT) + + val addsAndRemoves = actionsToCommit + .map(_.wrap) + .filter(sa => sa.remove != null || sa.add != null) + + if (hasAdds && hasRemoves && !hasDataChange && allDeltaActionsCaptured) { + val rewriteHelper = icebergTxn.getRewriteHelper() + val split = addsAndRemoves.partition(_.add == null) + rewriteHelper.rewrite(removes = split._1.map(_.remove), adds = split._2.map(_.add)) + rewriteHelper.commit() + } else if ((hasAdds && hasRemoves) || !allDeltaActionsCaptured) { + val overwriteHelper = icebergTxn.getOverwriteHelper() + addsAndRemoves.foreach { action => + if (action.add != null) { + overwriteHelper.add(action.add) + } else { + overwriteHelper.remove(action.remove) + } + } + overwriteHelper.commit() + } else if (hasAdds) { + val appendHelper = icebergTxn.getAppendOnlyHelper() + addsAndRemoves.foreach(action => appendHelper.add(action.add)) + appendHelper.commit() + } else if (hasRemoves) { + val removeHelper = icebergTxn.getRemoveOnlyHelper() + addsAndRemoves.foreach(action => removeHelper.remove(action.remove)) + removeHelper.commit() + } + } + } +} diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergSchemaUtils.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergSchemaUtils.scala new file mode 100644 index 00000000000..7e004cb5c1d --- /dev/null +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergSchemaUtils.scala @@ -0,0 +1,113 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.icebergShaded + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.delta.DeltaColumnMapping +import org.apache.spark.sql.delta.metering.DeltaLogging +import shadedForDelta.org.apache.iceberg.{Schema => IcebergSchema} +import shadedForDelta.org.apache.iceberg.types.{Type => IcebergType, Types => IcebergTypes} + +import org.apache.spark.sql.types._ + +object IcebergSchemaUtils extends DeltaLogging { + + ///////////////// + // Public APIs // + ///////////////// + + // scalastyle:off line.size.limit + /** + * Delta types are defined here: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#schema-serialization-format + * + * Iceberg types are defined here: https://iceberg.apache.org/spec/#schemas-and-data-types + */ + // scalastyle:on line.size.limit + def convertDeltaSchemaToIcebergSchema(deltaSchema: StructType): IcebergSchema = { + val icebergStruct = convertStruct(deltaSchema) + new IcebergSchema(icebergStruct.fields()) + } + + //////////////////// + // Helper Methods // + //////////////////// + + /** Visible for testing */ + private[delta] def convertStruct(deltaSchema: StructType): IcebergTypes.StructType = { + /** + * Recursively (i.e. for all nested elements) transforms the delta DataType `elem` into its + * corresponding Iceberg type. + * + * - StructType -> IcebergTypes.StructType + * - ArrayType -> IcebergTypes.ListType + * - MapType -> IcebergTypes.MapType + * - primitive -> IcebergType.PrimitiveType + */ + def transform[E <: DataType](elem: E): IcebergType = elem match { + case StructType(fields) => + IcebergTypes.StructType.of(fields.map { f => + if (!DeltaColumnMapping.hasColumnId(f)) { + throw new UnsupportedOperationException("UniForm requires Column Mapping") + } + + IcebergTypes.NestedField.of( + DeltaColumnMapping.getColumnId(f), + f.nullable, + f.name, + transform(f.dataType), + f.getComment().orNull + ) + }.toList.asJava) + + case ArrayType(elementType, containsNull) => + throw new UnsupportedOperationException("UniForm doesn't support Array columns") + + case MapType(keyType, valueType, valueContainsNull) => + throw new UnsupportedOperationException("UniForm doesn't support Map columns") + + case atomicType: AtomicType => convertAtomic(atomicType) + + case other => + throw new UnsupportedOperationException(s"Cannot convert Delta type $other to Iceberg") + } + + transform(deltaSchema).asStructType() + } + + /** + * Converts delta atomic into an iceberg primitive. + * + * Visible for testing. + * + * https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types + */ + private[delta] def convertAtomic[E <: DataType](elem: E): IcebergType.PrimitiveType = elem match { + case StringType => IcebergTypes.StringType.get() + case LongType => IcebergTypes.LongType.get() + case IntegerType | ShortType | ByteType => IcebergTypes.IntegerType.get() + case FloatType => IcebergTypes.FloatType.get() + case DoubleType => IcebergTypes.DoubleType.get() + case d: DecimalType => IcebergTypes.DecimalType.of(d.precision, d.scale) + case BooleanType => IcebergTypes.BooleanType.get() + case BinaryType => IcebergTypes.BinaryType.get() + case DateType => IcebergTypes.DateType.get() + case TimestampType => IcebergTypes.TimestampType.withZone() + case TimestampNTZType => IcebergTypes.TimestampType.withoutZone() + case _ => throw new UnsupportedOperationException(s"Could not convert atomic type $elem") + } +} diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala new file mode 100644 index 00000000000..bd850c0a0d7 --- /dev/null +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala @@ -0,0 +1,170 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.icebergShaded + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfigs, DeltaLog} +import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile} +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.hadoop.fs.Path +import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, PartitionSpec, Schema => IcebergSchema} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ + +object IcebergTransactionUtils + extends DeltaLogging + { + + ///////////////// + // Public APIs // + ///////////////// + + def createPartitionSpec( + icebergSchema: IcebergSchema, + partitionColumns: Seq[String]): PartitionSpec = { + if (partitionColumns.isEmpty) { + PartitionSpec.unpartitioned + } else { + val builder = PartitionSpec.builderFor(icebergSchema) + for (partitionName <- partitionColumns) { + builder.identity(partitionName) + } + builder.build() + } + } + + def convertDeltaAddFileToIcebergDataFile( + add: AddFile, + tablePath: Path, + partitionSpec: PartitionSpec, + logicalToPhysicalPartitionNames: Map[String, String], + statsSchema: StructType, + statsParser: String => InternalRow, + deltaLog: DeltaLog): DataFile = { + if (add.deletionVector != null) { + throw new UnsupportedOperationException("No support yet for DVs") + } + + var dataFileBuilder = + convertFileAction(add, tablePath, partitionSpec, logicalToPhysicalPartitionNames) + // Attempt to attach the number of records metric regardless of whether the Delta stats + // string is null/empty or not because this metric is required by Iceberg. If the number + // of records is both unavailable here and unavailable in the Delta stats, Iceberg will + // throw an exception when building the data file. + .withRecordCount(add.numLogicalRecords.getOrElse(-1L)) + + + dataFileBuilder.build() + } + + /** + * Note that APIs like [[shadedForDelta.org.apache.iceberg.OverwriteFiles#deleteFile]] take + * a DataFile, and not a DeleteFile as you might have expected. + */ + def convertDeltaRemoveFileToIcebergDataFile( + remove: RemoveFile, + tablePath: Path, + partitionSpec: PartitionSpec, + logicalToPhysicalPartitionNames: Map[String, String]): DataFile = { + convertFileAction(remove, tablePath, partitionSpec, logicalToPhysicalPartitionNames) + .withRecordCount(remove.numLogicalRecords.getOrElse(0L)) + .build() + } + + /** + * We expose this as a public API since APIs like + * [[shadedForDelta.org.apache.iceberg.DeleteFiles#deleteFile]] actually only need to take in + * a file path String, thus we don't need to actually convert a [[RemoveFile]] into a [[DataFile]] + * in this case. + */ + def canonicalizeFilePath(f: FileAction, tablePath: Path): String = { + // Recall that FileActions can have either relative paths or absolute paths (i.e. from shallow- + // cloned files). + // Iceberg spec requires path be fully qualified path, suitable for constructing a Hadoop Path + if (f.pathAsUri.isAbsolute) f.path else new Path(tablePath, f.path).toString + } + + /** Returns the (deletions, additions) iceberg table property changes. */ + def detectPropertiesChange( + newProperties: Map[String, String], + prevPropertiesOpt: Map[String, String]): (Set[String], Map[String, String]) = { + val newPropertiesIcebergOnly = getIcebergPropertiesFromDeltaProperties(newProperties) + val prevPropertiesOptIcebergOnly = getIcebergPropertiesFromDeltaProperties(prevPropertiesOpt) + + if (prevPropertiesOptIcebergOnly == newPropertiesIcebergOnly) return (Set.empty, Map.empty) + + ( + prevPropertiesOptIcebergOnly.keySet.diff(newPropertiesIcebergOnly.keySet), + newPropertiesIcebergOnly + ) + } + + /** + * Only keep properties whose key starts with "delta.universalformat.config.iceberg" + * and strips the prefix from the key; Note the key is already normalized to lower case. + */ + def getIcebergPropertiesFromDeltaProperties( + properties: Map[String, String]): Map[String, String] = { + val prefix = DeltaConfigs.DELTA_UNIVERSAL_FORMAT_ICEBERG_CONFIG_PREFIX + properties.filterKeys(_.startsWith(prefix)).map(kv => (kv._1.stripPrefix(prefix), kv._2)).toMap + } + + /** Returns the mapping of logicalPartitionColName -> physicalPartitionColName */ + def getPartitionPhysicalNameMapping(partitionSchema: StructType): Map[String, String] = { + partitionSchema.fields.map(f => f.name -> DeltaColumnMapping.getPhysicalName(f)).toMap + } + + //////////////////// + // Helper Methods // + //////////////////// + + /** Visible for testing. */ + private[delta] def convertFileAction( + f: FileAction, + tablePath: Path, + partitionSpec: PartitionSpec, + logicalToPhysicalPartitionNames: Map[String, String]): DataFiles.Builder = { + val absPath = canonicalizeFilePath(f, tablePath) + + var builder = DataFiles + .builder(partitionSpec) + .withPath(absPath) + .withFileSizeInBytes(f.getFileSize) + .withFormat(FileFormat.PARQUET) + + if (partitionSpec.isPartitioned) { + val partitionPath = partitionSpec + .fields() + .asScala + .map(_.name) + .map { logicalPartCol => + // The Iceberg Schema and PartitionSpec all use the column logical names. + // Delta FileAction::partitionValues, however, uses physical names. + val physicalPartKey = logicalToPhysicalPartitionNames(logicalPartCol) + s"$logicalPartCol=${f.partitionValues(physicalPartKey)}" + } + .mkString("/") + + builder = builder.withPartitionPath(partitionPath) + } + + builder + } +} diff --git a/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala b/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala new file mode 100644 index 00000000000..94d0b7e590a --- /dev/null +++ b/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala @@ -0,0 +1,199 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.io.File + +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.sql.{QueryTest, Row, SparkSession} +import org.apache.spark.util.Utils +import org.apache.spark.SparkContext + +class ConvertToIcebergSuite extends QueryTest with Eventually { + + private var _sparkSession: SparkSession = null + private var _sparkSessionWithDelta: SparkSession = null + private var _sparkSessionWithIceberg: SparkSession = null + + private var warehousePath: File = null + private var testTablePath: String = null + private val testTableName: String = "deltaTable" + + override def spark: SparkSession = _sparkSession + + override def beforeAll(): Unit = { + super.beforeAll() + warehousePath = Utils.createTempDir() + _sparkSessionWithDelta = createSparkSessionWithDelta() + _sparkSessionWithIceberg = createSparkSessionWithIceberg() + require(!_sparkSessionWithDelta.eq(_sparkSessionWithIceberg), "separate sessions expected") + } + + override def beforeEach(): Unit = { + super.beforeEach() + testTablePath = Utils.createTempDir().getAbsolutePath + } + + override def afterEach(): Unit = { + super.afterEach() + Utils.deleteRecursively(new File(testTablePath)) + _sparkSessionWithDelta.sql(s"DROP TABLE IF EXISTS $testTableName") + _sparkSessionWithIceberg.sql(s"DROP TABLE IF EXISTS $testTableName") + } + + override def afterAll(): Unit = { + super.afterAll() + if (warehousePath != null) Utils.deleteRecursively(warehousePath) + SparkContext.getActive.foreach(_.stop()) + } + + test("basic test - path based table created with SQL") { + runDeltaSql(s"""CREATE TABLE delta.`$testTablePath` (col1 INT) USING DELTA + |TBLPROPERTIES ( + | 'delta.columnMapping.mode' = 'id', + | 'delta.universalFormat.enabledFormats' = 'iceberg' + |)""".stripMargin) + verifyReadWithIceberg(testTablePath, Seq()) + runDeltaSql(s"INSERT INTO delta.`$testTablePath` VALUES (123)") + verifyReadWithIceberg(testTablePath, Seq(Row(123))) + } + + test("basic test - catalog table created with SQL") { + runDeltaSql(s"""CREATE TABLE $testTableName(col1 INT) USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.columnMapping.mode' = 'id', + | 'delta.universalFormat.enabledFormats' = 'iceberg' + |)""".stripMargin) + verifyReadWithIceberg(testTablePath, Seq()) + runDeltaSql(s"INSERT INTO $testTableName VALUES (123)") + verifyReadWithIceberg(testTablePath, Seq(Row(123))) + } + + test("basic test - path based table created with DataFrame") { + withDeltaSparkSession { deltaSpark => + withDefaultTablePropsInSQLConf { + deltaSpark.range(10).write.format("delta").save(testTablePath) + } + } + verifyReadWithIceberg(testTablePath, 0 to 9 map (Row(_))) + withDeltaSparkSession { deltaSpark => + deltaSpark.range(10, 20, 1) + .write.format("delta").mode("append").save(testTablePath) + } + verifyReadWithIceberg(testTablePath, 0 to 19 map (Row(_))) + } + + test("basic test - catalog table created with DataFrame") { + withDeltaSparkSession { deltaSpark => + withDefaultTablePropsInSQLConf { + deltaSpark.range(10).write.format("delta") + .option("path", testTablePath) + .saveAsTable(testTableName) + } + } + verifyReadWithIceberg(testTablePath, 0 to 9 map (Row(_))) + withDeltaSparkSession { deltaSpark => + deltaSpark.range(10, 20, 1) + .write.format("delta").mode("append") + .option("path", testTablePath) + .saveAsTable(testTableName) + } + verifyReadWithIceberg(testTablePath, 0 to 19 map (Row(_))) + } + + def runDeltaSql(sqlStr: String): Unit = { + withDeltaSparkSession { deltaSpark => + deltaSpark.sql(sqlStr) + } + } + + def verifyReadWithIceberg(tablePath: String, expectedAnswer: Seq[Row]): Unit = { + withIcebergSparkSession { icebergSparkSession => + eventually(timeout(10.seconds)) { + val icebergDf = icebergSparkSession.read.format("iceberg").load(tablePath) + checkAnswer(icebergDf, expectedAnswer) + } + } + } + + def tablePropsForCreate: String = { + s""" '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'id', + | '${DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.key}' = 'iceberg'""".stripMargin + } + + def withDefaultTablePropsInSQLConf(f: => Unit): Unit = { + withSQLConf( + DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey -> "id", + DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.defaultTablePropertyKey -> "iceberg" + ) { f } + } + + def withDeltaSparkSession(f: SparkSession => Unit): Unit = { + withSparkSession(_sparkSessionWithDelta, f) + } + + def withIcebergSparkSession(f: SparkSession => Unit): Unit = { + withSparkSession(_sparkSessionWithIceberg, f) + } + + def withSparkSession(sessionToUse: SparkSession, f: SparkSession => Unit): Unit = { + try { + SparkSession.setDefaultSession(sessionToUse) + SparkSession.setActiveSession(sessionToUse) + _sparkSession = sessionToUse + f(sessionToUse) + } finally { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + _sparkSession = null + } + } + + protected def createSparkSessionWithDelta(): SparkSession = { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + val sparkSession = SparkSession.builder() + .master("local[*]") + .appName("DeltaSession") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .getOrCreate() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + sparkSession + } + + protected def createSparkSessionWithIceberg(): SparkSession = { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + val sparkSession = SparkSession.builder() + .master("local[*]") + .appName("IcebergSession") + .config("spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hive") + .getOrCreate() + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + sparkSession + } + +} diff --git a/icebergShaded/.gitignore b/icebergShaded/.gitignore new file mode 100644 index 00000000000..efaced7fb6f --- /dev/null +++ b/icebergShaded/.gitignore @@ -0,0 +1,2 @@ +iceberg_src +lib \ No newline at end of file diff --git a/icebergShaded/generate_iceberg_jars.py b/icebergShaded/generate_iceberg_jars.py new file mode 100644 index 00000000000..56a08bb43c4 --- /dev/null +++ b/icebergShaded/generate_iceberg_jars.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 + +# +# Copyright (2021) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse +import os +import glob +import subprocess +import shlex +import shutil +from os import path + +iceberg_lib_dir_name = "lib" +iceberg_src_dir_name = "iceberg_src" +iceberg_patches_dir_name = "iceberg_src_patches" + +iceberg_src_commit_hash = "ede085d0f7529f24acd0c81dd0a43f7bb969b763" +iceberg_src_branch_with_commit_hash = "master" # only this branch will be downloaded +iceberg_src_compiled_jar_rel_paths = [ # related to `iceberg_src_dir_name` + "bundled-guava/build/libs/iceberg-bundled-guava-1.2.0-SNAPSHOT.jar", + "common/build/libs/iceberg-common-1.2.0-SNAPSHOT.jar", + "api/build/libs/iceberg-api-1.2.0-SNAPSHOT.jar", + "core/build/libs/iceberg-core-1.2.0-SNAPSHOT.jar", + "parquet/build/libs/iceberg-parquet-1.2.0-SNAPSHOT.jar", +] + +iceberg_root_dir = path.abspath(path.dirname(__file__)) +iceberg_src_dir = path.join(iceberg_root_dir, iceberg_src_dir_name) +iceberg_patches_dir = path.join(iceberg_root_dir, iceberg_patches_dir_name) +iceberg_lib_dir = path.join(iceberg_root_dir, iceberg_lib_dir_name) + + +def compile_jar_rel_path_to_lib_jar_path(jar_rel_path): + jar_file_name = path.basename(path.normpath(jar_rel_path)) + jar_file_name_splits = path.splitext(jar_file_name) + new_jar_file_name = "%s_%s%s" % (jar_file_name_splits[0], iceberg_src_commit_hash, jar_file_name_splits[1]) + return path.join(iceberg_lib_dir, new_jar_file_name) + + +def iceberg_jars_exists(): + for jar_rel_path in iceberg_src_compiled_jar_rel_paths: + if not path.exists(compile_jar_rel_path_to_lib_jar_path(jar_rel_path)): + return False + return True + + +def prepare_iceberg_source(): + with WorkingDirectory(iceberg_root_dir): + print(">>> Cloning Iceberg repo") + shutil.rmtree(iceberg_src_dir_name, ignore_errors=True) + run_cmd("git config user.email \"<>\"") + run_cmd("git config user.name \"Anonymous\"") + run_cmd("git clone --branch %s https://github.com/apache/iceberg.git %s" % + (iceberg_src_branch_with_commit_hash, iceberg_src_dir_name)) + + with WorkingDirectory(iceberg_src_dir): + run_cmd("git config user.email \"<>\"") + run_cmd("git config user.name \"Anonymous\"") + run_cmd("git checkout %s" % iceberg_src_commit_hash) + + print(">>> Applying patch files") + patch_files = glob.glob(path.join(iceberg_patches_dir, "*.patch")) + patch_files.sort() + + for patch_file in patch_files: + print(">>> Applying '%s'" % patch_file) + run_cmd("git apply %s" % patch_file) + run_cmd("git add .") + run_cmd("git commit -a -m 'applied %s'" % path.basename(patch_file)) + + +def generate_iceberg_jars(): + print(">>> Compiling JARs") + with WorkingDirectory(iceberg_src_dir): + # disable style checks (can fail with patches) and tests + build_args = "-x spotlessCheck -x checkstyleMain -x test -x integrationTest" + run_cmd("./gradlew :iceberg-core:build %s" % build_args) + run_cmd("./gradlew :iceberg-parquet:build %s" % build_args) + + print(">>> Copying JARs to lib directory") + shutil.rmtree(iceberg_lib_dir, ignore_errors=True) + os.mkdir(iceberg_lib_dir) + + for compiled_jar_rel_path in iceberg_src_compiled_jar_rel_paths: + compiled_jar_full_path = path.join(iceberg_src_dir, compiled_jar_rel_path) + if not path.exists(compiled_jar_full_path): + raise Exception("Could not find the jar " + compiled_jar_full_path) + lib_jar_full_path = compile_jar_rel_path_to_lib_jar_path(compiled_jar_rel_path) + shutil.copyfile(compiled_jar_full_path, lib_jar_full_path) + + if not iceberg_jars_exists(): + raise Exception("JAR copying failed") + + +def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs): + if isinstance(cmd, str): + cmd = shlex.split(cmd) + cmd_env = os.environ.copy() + if env: + cmd_env.update(env) + + if stream_output: + child = subprocess.Popen(cmd, env=cmd_env, **kwargs) + exit_code = child.wait() + if throw_on_error and exit_code != 0: + raise Exception("Non-zero exitcode: %s" % (exit_code)) + print("----\n") + return exit_code + else: + child = subprocess.Popen( + cmd, + env=cmd_env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs) + (stdout, stderr) = child.communicate() + exit_code = child.wait() + if throw_on_error and exit_code != 0: + raise Exception( + "Non-zero exitcode: %s\n\nSTDOUT:\n%s\n\nSTDERR:%s" % + (exit_code, stdout, stderr)) + return (exit_code, stdout, stderr) + + +# pylint: disable=too-few-public-methods +class WorkingDirectory(object): + def __init__(self, working_directory): + self.working_directory = working_directory + self.old_workdir = os.getcwd() + + def __enter__(self): + os.chdir(self.working_directory) + + def __exit__(self, tpe, value, traceback): + os.chdir(self.old_workdir) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--force", + required=False, + default=False, + action="store_true", + help="Force the generation even if already generated, useful for testing.") + args = parser.parse_args() + + if args.force or not iceberg_jars_exists(): + prepare_iceberg_source() + generate_iceberg_jars() diff --git a/icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch b/icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch new file mode 100644 index 00000000000..8be6a077109 --- /dev/null +++ b/icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch @@ -0,0 +1,186 @@ +Creates a new `SetSchema` pending update that will let us set the latest iceberg schema instead of having to apply incremental/delta changes to the existing schema. + +This PR requires that column mapping ID mode be enabled, and uses the same fieldId on the iceberg schema using the delta schema columnIds. + +This PR also blocks MapType or ArrayType (on the iceberg side). Doing so requires more complicated fieldId calculation, which is out of scope of this PR and of the first milestone. TLDR Delta Map and Array types have their inner elements as DataTypes, but iceberg Map and List types have their inner elements as actual fields (which need a field ID). So even though delta column mapping ID mode will assign IDs to each delta field, this is insufficient as it won't assign IDs for these maps/array types. + +--- + .../java/org/apache/iceberg/SetSchema.java | 25 ++ + .../java/org/apache/iceberg/Transaction.java | 7 + + .../org/apache/iceberg/BaseTransaction.java | 8 + + .../iceberg/CommitCallbackTransaction.java | 5 + + .../org/apache/iceberg/SetSchemaImpl.java | 45 ++++ + .../org/apache/iceberg/TableMetadata.java | 14 +- + .../IcebergConversionTransaction.scala | 232 +++++++++--------- + .../tahoe/iceberg/IcebergSchemaUtils.scala | 55 +++-- + .../iceberg/IcebergTransactionUtils.scala | 16 +- + .../IcebergConversionTransactionSuite.scala | 224 ++++++++++++++++- + .../tahoe/iceberg/IcebergConverterSuite.scala | 3 +- + .../iceberg/IcebergSchemaUtilsSuite.scala | 200 ++++++++------- + .../IcebergTransactionUtilsSuite.scala | 25 +- + 13 files changed, 595 insertions(+), 264 deletions(-) + create mode 100644 api/src/main/java/org/apache/iceberg/SetSchema.java + create mode 100644 core/src/main/java/org/apache/iceberg/SetSchemaImpl.java + +diff --git a/api/src/main/java/org/apache/iceberg/SetSchema.java b/connector/iceberg-core/api/src/main/java/org/apache/iceberg/SetSchema.java +new file mode 100644 +index 00000000000..042a594ae5b +--- /dev/null ++++ b/api/src/main/java/org/apache/iceberg/SetSchema.java +@@ -0,0 +1,25 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package org.apache.iceberg; ++ ++/** ++ * API to set the new, latest Iceberg schema. ++ */ ++public interface SetSchema extends PendingUpdate { } +diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/connector/iceberg-core/api/src/main/java/org/apache/iceberg/Transaction.java +index 090b5dfe37c..3879c9a9146 100644 +--- a/api/src/main/java/org/apache/iceberg/Transaction.java ++++ b/api/src/main/java/org/apache/iceberg/Transaction.java +@@ -37,6 +37,13 @@ public interface Transaction { + */ + UpdateSchema updateSchema(); + ++ /** ++ * Create a new {@link SetSchema} to set the new table schema. ++ * ++ * @return a new {@link SetSchema} ++ */ ++ SetSchema setSchema(Schema newSchema); ++ + /** + * Create a new {@link UpdatePartitionSpec} to alter the partition spec of this table. + * +diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/BaseTransaction.java +index 241738fedab..e299d04ebbd 100644 +--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java ++++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java +@@ -113,6 +113,14 @@ public class BaseTransaction implements Transaction { + return schemaChange; + } + ++ @Override ++ public SetSchema setSchema(Schema newSchema) { ++ checkLastOperationCommitted("SetSchema"); ++ SetSchema setSchema = new SetSchemaImpl(transactionOps, transactionOps.current(), newSchema); ++ updates.add(setSchema); ++ return setSchema; ++ } ++ + @Override + public UpdatePartitionSpec updateSpec() { + checkLastOperationCommitted("UpdateSpec"); +diff --git a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java +index 19b74a65eca..6a2d7614a82 100644 +--- a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java ++++ b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java +@@ -41,6 +41,11 @@ class CommitCallbackTransaction implements Transaction { + return wrapped.updateSchema(); + } + ++ @Override ++ public SetSchema setSchema(Schema newSchema) { ++ return wrapped.setSchema(newSchema); ++ } ++ + @Override + public UpdatePartitionSpec updateSpec() { + return wrapped.updateSpec(); +diff --git a/core/src/main/java/org/apache/iceberg/SetSchemaImpl.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/SetSchemaImpl.java +new file mode 100644 +index 00000000000..ce6731a4e13 +--- /dev/null ++++ b/core/src/main/java/org/apache/iceberg/SetSchemaImpl.java +@@ -0,0 +1,45 @@ ++/* ++ * Copyright (2021) The Delta Lake Project Authors. ++ * ++ * Licensed under the Apache License, Version 2.0 (the "License"); ++ * you may not use this file except in compliance with the License. ++ * You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++ ++ ++ ++package org.apache.iceberg; ++ ++public class SetSchemaImpl implements SetSchema { ++ ++ private final TableOperations ops; ++ private final TableMetadata base; ++ private final Schema newSchema; ++ ++ public SetSchemaImpl(TableOperations ops, TableMetadata base, Schema newSchema) { ++ this.ops = ops; ++ this.base = base; ++ this.newSchema = newSchema; ++ } ++ ++ @Override ++ public Schema apply() { ++ return newSchema; ++ } ++ ++ @Override ++ public void commit() { ++ // This will override the current schema ++ TableMetadata update = base.updateSchema(apply(), newSchema.highestFieldId()); ++ ops.commit(base, update); ++ } ++} +diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/TableMetadata.java +index afa2c7ac2d5..52546f02a75 100644 +--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java ++++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java +@@ -1339,11 +1339,15 @@ public class TableMetadata implements Serializable { + } + + private int addSchemaInternal(Schema schema, int newLastColumnId) { +- Preconditions.checkArgument( +- newLastColumnId >= lastColumnId, +- "Invalid last column ID: %s < %s (previous last column ID)", +- newLastColumnId, +- lastColumnId); ++ // Since we use txn.setSchema instead of txn.updateSchema, we are manually setting the new ++ // schema. Thus, if we drop the last column, it is clearly possible and valid for the ++ // newLastColumnId to be < the previous lastColumnId. Thus, we ignore this check. ++ // ++ // Preconditions.checkArgument( ++ // newLastColumnId >= lastColumnId, ++ // "Invalid last column ID: %s < %s (previous last column ID)", ++ // newLastColumnId, ++ // lastColumnId); + + int newSchemaId = reuseOrCreateNewSchemaId(schema); + boolean schemaFound = schemasById.containsKey(newSchemaId); +-- +2.39.2 (Apple Git-143) diff --git a/icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch b/icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch new file mode 100644 index 00000000000..a181f065040 --- /dev/null +++ b/icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch @@ -0,0 +1,177 @@ +iceberg core must NOT delete any delta data files + +--- + .../iceberg/IncrementalFileCleanup.java | 8 +-- + .../apache/iceberg/ReachableFileCleanup.java | 5 +- + .../apache/iceberg/TestRemoveSnapshots.java | 57 +++++++++++-------- + 3 files changed, 40 insertions(+), 30 deletions(-) + +diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java +index d894dcbf36d..ead7ea6b076 100644 +--- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java ++++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java +@@ -256,10 +256,10 @@ class IncrementalFileCleanup extends FileCleanupStrategy { + } + }); + +- Set filesToDelete = +- findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); +- +- deleteFiles(filesToDelete, "data"); ++ // iceberg core MUST NOT delete any data files which are managed by delta ++ // Set filesToDelete = ++ // findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); ++ // deleteFiles(filesToDelete, "data"); + LOG.warn("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); + LOG.warn("Manifests Lists to delete: {}", Joiner.on(", ").join(manifestListsToDelete)); + deleteFiles(manifestsToDelete, "manifest"); +diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java +index ccbee78e27b..da888a63b3d 100644 +--- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java ++++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java +@@ -72,8 +72,9 @@ class ReachableFileCleanup extends FileCleanupStrategy { + snapshotsAfterExpiration, deletionCandidates, currentManifests::add); + + if (!manifestsToDelete.isEmpty()) { +- Set dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests); +- deleteFiles(dataFilesToDelete, "data"); ++ // iceberg core MUST NOT delete any data files which are managed by delta ++ // Set dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests); ++ // deleteFiles(dataFilesToDelete, "data"); + Set manifestPathsToDelete = + manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet()); + deleteFiles(manifestPathsToDelete, "manifest"); +diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/connector/iceberg-core/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +index 53e5af520d9..95fa8e41de1 100644 +--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java ++++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +@@ -147,8 +147,9 @@ public class TestRemoveSnapshots extends TableTestBase { + secondSnapshot + .allManifests(table.io()) + .get(0) +- .path(), // manifest contained only deletes, was dropped +- FILE_A.path()), // deleted ++ .path() // manifest contained only deletes, was dropped ++ // FILE_A.path() should NOT delete data files ++ ), // deleted + deletedFiles); + } + +@@ -209,8 +210,9 @@ public class TestRemoveSnapshots extends TableTestBase { + .allManifests(table.io()) + .get(0) + .path(), // manifest was rewritten for delete +- secondSnapshot.manifestListLocation(), // snapshot expired +- FILE_A.path()), // deleted ++ secondSnapshot.manifestListLocation() // snapshot expired ++ // FILE_A.path() should not delete any data files ++ ), + deletedFiles); + } + +@@ -309,8 +311,9 @@ public class TestRemoveSnapshots extends TableTestBase { + Sets.newHashSet( + secondSnapshot.manifestListLocation(), // snapshot expired + Iterables.getOnlyElement(secondSnapshotManifests) +- .path(), // manifest is no longer referenced +- FILE_B.path()), // added, but rolled back ++ .path() // manifest is no longer referenced ++ // FILE_B.path() should not delete any data files ++ ), + deletedFiles); + } + +@@ -686,7 +689,8 @@ public class TestRemoveSnapshots extends TableTestBase { + + removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); + +- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); ++ Assert.assertTrue("FILE_A should NOT be deleted", ++ !deletedFiles.contains(FILE_A.path().toString())); + } + + @Test +@@ -712,7 +716,8 @@ public class TestRemoveSnapshots extends TableTestBase { + + removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); + +- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); ++ Assert.assertTrue("FILE_A should NOT be deleted", ++ !deletedFiles.contains(FILE_A.path().toString())); + } + + @Test +@@ -749,8 +754,10 @@ public class TestRemoveSnapshots extends TableTestBase { + + removeSnapshots(table).expireOlderThan(t4).deleteWith(deletedFiles::add).commit(); + +- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); +- Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); ++ Assert.assertTrue("FILE_A should NOT be deleted", ++ !deletedFiles.contains(FILE_A.path().toString())); ++ Assert.assertTrue("FILE_B should NOT be deleted", ++ !deletedFiles.contains(FILE_B.path().toString())); + } + + @Test +@@ -824,9 +831,11 @@ public class TestRemoveSnapshots extends TableTestBase { + Sets.newHashSet( + "remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3")); + +- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); +- Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); +- Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0); ++ Assert.assertTrue("FILE_A should NOT be deleted", ++ !deletedFiles.contains(FILE_A.path().toString())); ++ Assert.assertTrue("FILE_B should NOT be deleted", ++ !deletedFiles.contains(FILE_B.path().toString())); ++ // Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0); + } + + @Test +@@ -885,13 +894,13 @@ public class TestRemoveSnapshots extends TableTestBase { + Set expectedDeletes = Sets.newHashSet(); + expectedDeletes.add(snapshotA.manifestListLocation()); + +- // Files should be deleted of dangling staged snapshot +- snapshotB +- .addedDataFiles(table.io()) +- .forEach( +- i -> { +- expectedDeletes.add(i.path().toString()); +- }); ++ // Files should NOT be deleted of dangling staged snapshot ++ // snapshotB ++ // .addedDataFiles(table.io()) ++ // .forEach( ++ // i -> { ++ // expectedDeletes.add(i.path().toString()); ++ // }); + + // ManifestList should be deleted too + expectedDeletes.add(snapshotB.manifestListLocation()); +@@ -1144,10 +1153,10 @@ public class TestRemoveSnapshots extends TableTestBase { + removeSnapshots(table).expireOlderThan(fourthSnapshotTs).deleteWith(deletedFiles::add).commit(); + + Assert.assertEquals( +- "Should remove old delete files and delete file manifests", ++ "Should only delete file manifests", + ImmutableSet.builder() +- .add(FILE_A.path()) +- .add(FILE_A_DELETES.path()) ++ // .add(FILE_A.path()) ++ // .add(FILE_A_DELETES.path()) + .add(firstSnapshot.manifestListLocation()) + .add(secondSnapshot.manifestListLocation()) + .add(thirdSnapshot.manifestListLocation()) +@@ -1501,7 +1510,7 @@ public class TestRemoveSnapshots extends TableTestBase { + expectedDeletes.addAll(manifestPaths(appendA, table.io())); + expectedDeletes.add(branchDelete.manifestListLocation()); + expectedDeletes.addAll(manifestPaths(branchDelete, table.io())); +- expectedDeletes.add(FILE_A.path().toString()); ++ // expectedDeletes.add(FILE_A.path().toString()); + + Assert.assertEquals(2, Iterables.size(table.snapshots())); + Assert.assertEquals(expectedDeletes, deletedFiles); +-- +2.39.2 (Apple Git-143) diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index feb32199d30..5fb864afb77 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -792,6 +792,47 @@ ], "sqlState" : "42K09" }, + "DELTA_ICEBERG_COMPAT_V1_VIOLATION" : { + "message" : [ + "The validation of IcebergCompatV1 has failed." + ], + "subClass" : { + "DISABLING_REQUIRED_TABLE_FEATURE" : { + "message" : [ + "IcebergCompatV1 requires feature to be supported and enabled. You cannot remove it from the table. Instead, please disable IcebergCompatV1 first." + ] + }, + "INCOMPATIBLE_TABLE_FEATURE" : { + "message" : [ + "IcebergCompatV1 is incompatible with feature ." + ] + }, + "MISSING_REQUIRED_TABLE_FEATURE" : { + "message" : [ + "IcebergCompatV1 requires feature to be supported and enabled." + ] + }, + "REPLACE_TABLE_CHANGE_PARTITION_NAMES" : { + "message" : [ + "IcebergCompatV1 doesn't support replacing partitioned tables with a differently-named partition spec, because Iceberg-Spark 1.1.0 doesn't.", + "Prev Partition Spec: ", + "New Partition Spec: " + ] + }, + "UNSUPPORTED_DATA_TYPE" : { + "message" : [ + "IcebergCompatV1 doesn't support schema with MapType or ArrayType or NullType. Your schema:", + "" + ] + }, + "WRONG_REQUIRED_TABLE_PROPERTY" : { + "message" : [ + "IcebergCompatV1 requires table property '' to be set to ''. Current value: ''." + ] + } + }, + "sqlState" : "KD00E" + }, "DELTA_ILLEGAL_FILE_FOUND" : { "message" : [ "Illegal files found in a dataChange = false transaction. Files: " @@ -1902,6 +1943,12 @@ ], "sqlState" : "XXKDS" }, + "DELTA_UNIVERSAL_FORMAT_VIOLATION" : { + "message" : [ + "The validation of Universal Format () has failed: " + ], + "sqlState" : "KD00E" + }, "DELTA_UNKNOWN_CONFIGURATION" : { "message" : [ "Unknown configuration was specified: ", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala index adb52e61ca5..0f4b991e259 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala @@ -105,6 +105,17 @@ trait DeltaConfigsBase extends DeltaLogging { cal } + /** + * The prefix for a category of special configs for delta universal format to support the + * user facing config naming convention for different table formats: + * "delta.universalFormat.config.[iceberg/hudi].[config_name]" + * Note that config_name can be arbitrary. + */ + final val DELTA_UNIVERSAL_FORMAT_CONFIG_PREFIX = "delta.universalformat.config." + + final val DELTA_UNIVERSAL_FORMAT_ICEBERG_CONFIG_PREFIX = + s"${DELTA_UNIVERSAL_FORMAT_CONFIG_PREFIX}iceberg." + /** * A global default value set as a SQLConf will overwrite the default value of a DeltaConfig. * For example, user can run: @@ -624,6 +635,31 @@ trait DeltaConfigsBase extends DeltaLogging { fromString = _.toBoolean, validationFunction = _ => true, helpMessage = "needs to be a boolean.") + + /** + * Convert the table's metadata into other storage formats after each Delta commit. + * Only Iceberg is supported for now + */ + val UNIVERSAL_FORMAT_ENABLED_FORMATS = buildConfig[Seq[String]]( + "universalFormat.enabledFormats", + "", + fromString = str => + if (str == null || str.isEmpty) Nil + else str.split(","), + validationFunction = seq => + if (seq.distinct.length != seq.length) false + else seq.toSet.subsetOf(UniversalFormat.SUPPORTED_FORMATS), + s"Must be a comma-separated list of formats from the list: " + + s"${UniversalFormat.SUPPORTED_FORMATS.mkString("{", ",", "}")}." + ) + + val ICEBERG_COMPAT_V1_ENABLED = buildConfig[Option[Boolean]]( + "enableIcebergCompatV1", + null, + v => Option(v).map(_.toBoolean), + _ => true, + "needs to be a boolean." + ) } object DeltaConfigs extends DeltaConfigsBase diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index ac20bbe14a2..79641c3b9c6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2818,6 +2818,67 @@ trait DeltaErrorsBase messageParameters = Array(domainNames) ) } + + def uniFormIcebergRequiresIcebergCompatV1(): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_UNIVERSAL_FORMAT_VIOLATION", + messageParameters = Array( + UniversalFormat.ICEBERG_FORMAT, + "Requires IcebergCompatV1 to be manually enabled in order for Universal Format (Iceberg) " + + "to be enabled on an existing table." + ) + ) + } + + def icebergCompatV1ReplacePartitionedTableException( + prevPartitionCols: Seq[String], + newPartitionCols: Seq[String]): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_ICEBERG_COMPAT_V1_VIOLATION.REPLACE_TABLE_CHANGE_PARTITION_NAMES", + messageParameters = Array( + prevPartitionCols.mkString("(", ",", ")"), + newPartitionCols.mkString("(", ",", ")") + ) + ) + } + + def icebergCompatV1UnsupportedDataTypeException(schema: StructType): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_ICEBERG_COMPAT_V1_VIOLATION.UNSUPPORTED_DATA_TYPE", + messageParameters = Array(schema.treeString) + ) + } + + def icebergCompatV1MissingRequiredTableFeatureException(tf: TableFeature): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_ICEBERG_COMPAT_V1_VIOLATION.MISSING_REQUIRED_TABLE_FEATURE", + messageParameters = Array(tf.toString) + ) + } + + def icebergCompatV1DisablingRequiredTableFeatureException(tf: TableFeature): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_ICEBERG_COMPAT_V1_VIOLATION.DISABLING_REQUIRED_TABLE_FEATURE", + messageParameters = Array(tf.toString) + ) + } + + def icebergCompatV1IncompatibleTableFeatureException(tf: TableFeature): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_ICEBERG_COMPAT_V1_VIOLATION.INCOMPATIBLE_TABLE_FEATURE", + messageParameters = Array(tf.toString) + ) + } + + def icebergCompatV1WrongRequiredTablePropertyException( + key: String, + actualValue: String, + requiredValue: String): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_ICEBERG_COMPAT_V1_VIOLATION.WRONG_REQUIRED_TABLE_PROPERTY", + messageParameters = Array(key, requiredValue, actualValue) + ) + } } object DeltaErrors extends DeltaErrorsBase diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaFileProviderUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaFileProviderUtils.scala new file mode 100644 index 00000000000..cb14d3fb7f9 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaFileProviderUtils.scala @@ -0,0 +1,92 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.actions.Action +import org.apache.spark.sql.delta.storage.ClosableIterator +import org.apache.spark.sql.delta.util.FileNames.DeltaFile +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JsonToStructs +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +object DeltaFileProviderUtils { + + protected def readThreadPool = SnapshotManagement.deltaLogAsyncUpdateThreadPool + + /** Put any future parsing options here. */ + val jsonStatsParseOption = Map.empty[String, String] + + private[delta] def createJsonStatsParser(schemaToUse: StructType): String => InternalRow = { + val parser = JsonToStructs( + schema = schemaToUse, + options = jsonStatsParseOption, + child = null, + timeZoneId = Some(SQLConf.get.sessionLocalTimeZone) + ) + (json: String) => { + val utf8json = UTF8String.fromString(json) + parser.nullSafeEval(utf8json).asInstanceOf[InternalRow] + } + } + + /** + * Get the Delta json files present in the delta log in the range [startVersion, endVersion]. + * Returns the files in sorted order, and throws if any in the range are missing. + */ + def getDeltaFilesInVersionRange( + spark: SparkSession, + deltaLog: DeltaLog, + startVersion: Long, + endVersion: Long): Seq[FileStatus] = { + val result = deltaLog + .listFrom(startVersion) + .collect { case DeltaFile(fs, v) if v <= endVersion => (fs, v) } + .toSeq + // Verify that we got the entire range requested + if (result.size.toLong != endVersion - startVersion + 1) { + throw DeltaErrors.deltaVersionsNotContiguousException(spark, result.map(_._2)) + } + result.map(_._1) + } + + /** Helper method to read and parse the delta files parallelly into [[Action]]s. */ + def parallelReadAndParseDeltaFilesAsIterator( + deltaLog: DeltaLog, + spark: SparkSession, + files: Seq[FileStatus]): Seq[ClosableIterator[String]] = { + val hadoopConf = deltaLog.newDeltaHadoopConf() + parallelReadDeltaFilesBase(spark, files, hadoopConf, { file: FileStatus => + deltaLog.store.readAsIterator(file, hadoopConf) + }) + } + + protected def parallelReadDeltaFilesBase[A]( + spark: SparkSession, + files: Seq[FileStatus], + hadoopConf: Configuration, + f: FileStatus => A): Seq[A] = { + readThreadPool.parallelMap(spark, files) { file => + f(file) + }.toSeq + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 91e45395e3d..d41174020af 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -82,6 +82,7 @@ class DeltaLog private( with LogStoreProvider with SnapshotManagement with DeltaFileFormat + with ProvidesUniFormConverters with ReadChecksum { import org.apache.spark.sql.delta.files.TahoeFileIndex diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompatV1.scala b/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompatV1.scala new file mode 100644 index 00000000000..030290c2db5 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompatV1.scala @@ -0,0 +1,215 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, Protocol} +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.schema.SchemaUtils + +import org.apache.spark.sql.types.{ArrayType, MapType, NullType} + +/** + * Utils to validate the IcebergCompatV1 table feature, which is responsible for keeping Delta + * tables in valid states (see the Delta spec for full invariants, dependencies, and requirements) + * so that they are capable of having Delta to Iceberg metadata conversion applied to them. The + * IcebergCompatV1 table feature does not implement, specify, or control the actual metadata + * conversion; that is handled by the Delta UniForm feature. + * + * Note that UniForm (Iceberg) depends on IcebergCompatV1, but IcebergCompatV1 does not depend on or + * require UniForm (Iceberg). It is perfectly valid for a Delta table to have IcebergCompatV1 + * enabled but UniForm (Iceberg) not enabled. + */ +object IcebergCompatV1 extends DeltaLogging { + + val REQUIRED_TABLE_FEATURES = Seq(ColumnMappingTableFeature) + + val INCOMPATIBLE_TABLE_FEATURES = Seq(DeletionVectorsTableFeature) + + val REQUIRED_DELTA_TABLE_PROPERTIES = Seq( + RequiredDeltaTableProperty( + deltaConfig = DeltaConfigs.COLUMN_MAPPING_MODE, + validator = (mode: DeltaColumnMappingMode) => (mode == NameMapping || mode == IdMapping), + autoSetValue = NameMapping.name + ) + ) + + def isEnabled(metadata: Metadata): Boolean = { + DeltaConfigs.ICEBERG_COMPAT_V1_ENABLED.fromMetaData(metadata).getOrElse(false) + } + + /** + * Expected to be called after the newest metadata and protocol have been ~ finalized. + * + * Furthermore, this should be called *after* + * [[UniversalFormat.enforceIcebergInvariantsAndDependencies]]. + * + * If you are enabling IcebergCompatV1 and are creating a new table, this method will + * automatically upgrade the table protocol to support ColumnMapping and set it to 'name' mode, + * too. + * + * If you are disabling IcebergCompatV1, this method will also disable Universal Format (Iceberg), + * if it is enabled. + * + * @param actions The actions to be committed in the txn. We will only look at the [[AddFile]]s. + * + * @return tuple of options of (updatedProtocol, updatedMetadata). For either action, if no + * updates need to be applied, will return None. + */ + def enforceInvariantsAndDependencies( + prevProtocol: Protocol, + prevMetadata: Metadata, + newestProtocol: Protocol, + newestMetadata: Metadata, + isCreatingNewTable: Boolean, + actions: Seq[Action]): (Option[Protocol], Option[Metadata]) = { + val wasEnabled = IcebergCompatV1.isEnabled(prevMetadata) + val isEnabled = IcebergCompatV1.isEnabled(newestMetadata) + val tableId = newestMetadata.id + + (wasEnabled, isEnabled) match { + case (false, false) => (None, None) // Ignore + case (true, false) => // Disabling + // UniversalFormat.validateIceberg should detect that IcebergCompatV1 is being disabled, + // and automatically disable Universal Format (Iceberg) + assert(!UniversalFormat.icebergEnabled(newestMetadata)) + (None, None) + case (_, true) => // Enabling now or already-enabled + val tblFeatureUpdates = scala.collection.mutable.Set.empty[TableFeature] + val tblPropertyUpdates = scala.collection.mutable.Map.empty[String, String] + + // Note: Delta doesn't support partition evolution, but you can change the partitionColumns + // by doing a REPLACE or DataFrame overwrite. + // + // Iceberg-Spark itself *doesn't* support the following cases + // - CREATE TABLE partitioned by colA; REPLACE TABLE partitioned by colB + // - CREATE TABLE partitioned by colA; REPLACE TABLE not partitioned + // + // While Iceberg-Spark *does* support + // - CREATE TABLE not partitioned; REPLACE TABLE not partitioned + // - CREATE TABLE not partitioned; REPLACE TABLE partitioned by colA + // - CREATE TABLE partitioned by colA dataType1; REPLACE TABLE partitioned by colA dataType2 + if (prevMetadata.partitionColumns.nonEmpty && + prevMetadata.partitionColumns != newestMetadata.partitionColumns) { + throw DeltaErrors.icebergCompatV1ReplacePartitionedTableException( + prevMetadata.partitionColumns, newestMetadata.partitionColumns) + } + + if (SchemaUtils.typeExistsRecursively(newestMetadata.schema) { f => + f.isInstanceOf[MapType] || f.isInstanceOf[ArrayType] || f.isInstanceOf[NullType] + }) { + throw DeltaErrors.icebergCompatV1UnsupportedDataTypeException(newestMetadata.schema) + } + + // If this field is empty, then the AddFile is missing the `numRecords` statistic. + actions.collect { case a: AddFile if a.numLogicalRecords.isEmpty => + throw new UnsupportedOperationException(s"[tableId=$tableId] IcebergCompatV1 requires " + + s"all AddFiles to contain the numRecords statistic. AddFile ${a.path} is missing " + + s"this statistic. Stats: ${a.stats}") + } + + // Check we have all required table features + REQUIRED_TABLE_FEATURES.foreach { f => + (prevProtocol.isFeatureSupported(f), newestProtocol.isFeatureSupported(f)) match { + case (_, true) => // all good + case (false, false) => // txn has not supported it! + // Note: this code path should be impossible, since the IcebergCompatV1TableFeature + // specifies ColumnMappingTableFeature as a required table feature. Thus, + // it should already have been added during + // OptimisticTransaction::updateMetadataInternal + if (isCreatingNewTable) { + tblFeatureUpdates += f + } else { + throw DeltaErrors.icebergCompatV1MissingRequiredTableFeatureException(f) + } + case (true, false) => // txn is removing/un-supporting it! + // Note: currently it is impossible to remove/un-support a table feature + throw DeltaErrors.icebergCompatV1DisablingRequiredTableFeatureException(f) + } + } + + // Check we haven't added any incompatible table features + INCOMPATIBLE_TABLE_FEATURES.foreach { f => + if (newestProtocol.isFeatureSupported(f)) { + throw DeltaErrors.icebergCompatV1IncompatibleTableFeatureException(f) + } + } + + // Check we have all required delta table properties + REQUIRED_DELTA_TABLE_PROPERTIES.foreach { + case RequiredDeltaTableProperty(deltaConfig, validator, autoSetValue) => + val newestValue = deltaConfig.fromMetaData(newestMetadata) + val newestValueOkay = validator(newestValue) + val newestValueExplicitlySet = newestMetadata.configuration.contains(deltaConfig.key) + + val err = DeltaErrors.icebergCompatV1WrongRequiredTablePropertyException( + deltaConfig.key, newestValue.toString, autoSetValue) + + if (!newestValueOkay) { + if (!newestValueExplicitlySet && isCreatingNewTable) { + // This case covers both CREATE and REPLACE TABLE commands that + // did not explicitly specify the required deltaConfig. In these + // cases, we set the property automatically. + tblPropertyUpdates += deltaConfig.key -> autoSetValue + } else { + // In all other cases, if the property value is not compatible + // with the IcebergV1 requirements, we fail + throw err + } + } + } + + val protocolResult = if (tblFeatureUpdates.nonEmpty) { + logInfo(s"[tableId=$tableId] IcebergCompatV1 auto-supporting table features: " + + s"${tblFeatureUpdates.map(_.name)}") + Some(newestProtocol.merge(tblFeatureUpdates.map(Protocol.forTableFeature).toSeq: _*)) + } else None + + val metadataResult = if (tblPropertyUpdates.nonEmpty) { + logInfo(s"[tableId=$tableId] IcebergCompatV1 auto-setting table properties: " + + s"$tblPropertyUpdates") + val newConfiguration = newestMetadata.configuration ++ tblPropertyUpdates.toMap + var tmpNewMetadata = newestMetadata.copy(configuration = newConfiguration) + + if (tblPropertyUpdates.contains(DeltaConfigs.COLUMN_MAPPING_MODE.key)) { + assert(isCreatingNewTable, "we only auto-upgrade Column Mapping on new tables") + tmpNewMetadata = DeltaColumnMapping.assignColumnIdAndPhysicalName( + newMetadata = tmpNewMetadata, + oldMetadata = prevMetadata, + isChangingModeOnExistingTable = false + ) + DeltaColumnMapping.checkColumnIdAndPhysicalNameAssignments(tmpNewMetadata) + } + + Some(tmpNewMetadata) + } else None + + (protocolResult, metadataResult) + } + } +} + +/** + * Wrapper class for table property validation + * + * @param deltaConfig [[DeltaConfig]] we are checking + * @param validator A generic method to validate the given value + * @param autoSetValue The value to set if we can auto-set this value (e.g. during table creation) + */ +case class RequiredDeltaTableProperty[T]( + deltaConfig: DeltaConfig[T], + validator: T => Boolean, + autoSetValue: String) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index e03b1951796..12b3202b674 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.DeletionVectorUtils import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.files._ -import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, PostCommitHook} +import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, IcebergConverterHook, PostCommitHook} import org.apache.spark.sql.delta.implicits.addFileEncoder import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} @@ -314,6 +314,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite protected val postCommitHooks = new ArrayBuffer[PostCommitHook]() // The CheckpointHook will only checkpoint if necessary, so always register it to run. registerPostCommitHook(CheckpointHook) + registerPostCommitHook(IcebergConverterHook) /** The protocol of the snapshot that this transaction is reading at. */ def protocol: Protocol = newProtocol.getOrElse(snapshot.protocol) @@ -1319,6 +1320,28 @@ trait OptimisticTransactionImpl extends TransactionalWrite // Now, we know that there is at most 1 Metadata change (stored in newMetadata) and at most 1 // Protocol change (stored in newProtocol) + val (protocolUpdate1, metadataUpdate1) = + UniversalFormat.enforceIcebergInvariantsAndDependencies( + // Note: if this txn has no protocol or metadata updates, then `prev` will equal `newest`. + prevProtocol = snapshot.protocol, + prevMetadata = snapshot.metadata, + newestProtocol = protocol, // Note: this will try to use `newProtocol` + newestMetadata = metadata, // Note: this will try to use `newMetadata` + isCreatingNewTable + ) + newProtocol = protocolUpdate1.orElse(newProtocol) + newMetadata = metadataUpdate1.orElse(newMetadata) + + val (protocolUpdate2, metadataUpdate2) = IcebergCompatV1.enforceInvariantsAndDependencies( + prevProtocol = snapshot.protocol, + prevMetadata = snapshot.metadata, + newestProtocol = protocol, // Note: this will try to use `newProtocol` + newestMetadata = metadata, // Note: this will try to use `newMetadata` + isCreatingNewTable, + otherActions + ) + newProtocol = protocolUpdate2.orElse(newProtocol) + newMetadata = metadataUpdate2.orElse(newMetadata) var finalActions = newMetadata.toSeq ++ newProtocol.toSeq ++ otherActions @@ -1807,6 +1830,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite } } + private[delta] def unregisterPostCommitHooksWhere(predicate: PostCommitHook => Boolean): Unit = + postCommitHooks --= postCommitHooks.filter(predicate) + protected lazy val logPrefix: String = { def truncate(uuid: String): String = uuid.split("-").head s"[tableId=${truncate(snapshot.metadata.id)},txnId=${truncate(txnId)}] " diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala new file mode 100644 index 00000000000..6803d1a996c --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ProvidesUniFormConverters.scala @@ -0,0 +1,52 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.lang.reflect.InvocationTargetException + +import org.apache.commons.lang3.exception.ExceptionUtils + +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.Utils + +trait ProvidesUniFormConverters { self: DeltaLog => + /** + * Helper trait to instantiate the icebergConverter member variable of the [[DeltaLog]]. We do + * this through reflection so that delta-spark doesn't have a compile-time dependency on the + * shaded iceberg module. + */ + protected lazy val _icebergConverter: UniversalFormatConverter = try { + val clazz = + Utils.classForName("org.apache.spark.sql.delta.icebergShaded.IcebergConverter") + val constructor = clazz.getConstructor(classOf[SparkSession]) + constructor.newInstance(spark) + } catch { + case e: ClassNotFoundException => + logError(s"Failed to find Iceberg converter class", e) + throw DeltaErrors.icebergClassMissing(spark.sparkContext.getConf, e) + case e: InvocationTargetException => + logError(s"Got error when creating an Iceberg converter", e) + // The better error is within the cause + throw ExceptionUtils.getRootCause(e) + } + + /** Visible for tests (to be able to mock). */ + private[delta] var testIcebergConverter: Option[UniversalFormatConverter] = None + + def icebergConverter: UniversalFormatConverter = testIcebergConverter.getOrElse(_icebergConverter) +} + diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 171a3a073e4..a44b62ed714 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -221,6 +221,7 @@ object TableFeature { InvariantsTableFeature, ColumnMappingTableFeature, TimestampNTZTableFeature, + IcebergCompatV1TableFeature, DeletionVectorsTableFeature) if (DeltaUtils.isTesting) { features ++= Set( @@ -350,6 +351,18 @@ object RowTrackingFeature extends WriterFeature(name = "rowTracking") object DomainMetadataTableFeature extends WriterFeature(name = "domainMetadata") +object IcebergCompatV1TableFeature extends WriterFeature(name = "icebergCompatV1") + with FeatureAutomaticallyEnabledByMetadata { + + override def automaticallyUpdateProtocolOfExistingTables: Boolean = true + + override def metadataRequiresFeatureToBeEnabled( + metadata: Metadata, + spark: SparkSession): Boolean = IcebergCompatV1.isEnabled(metadata) + + override def requiredFeatures: Set[TableFeature] = Set(ColumnMappingTableFeature) +} + /** * Features below are for testing only, and are being registered to the system only in the testing diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala new file mode 100644 index 00000000000..174652f0d34 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala @@ -0,0 +1,168 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import org.apache.spark.sql.delta.metering.DeltaLogging + +import org.apache.spark.sql.SparkSession + +/** + * Utils to validate the Universal Format (UniForm) Delta feature (NOT a table feature). + * + * The UniForm Delta feature governs and implements the actual conversion of Delta metadata into + * other formats. + * + * Currently, UniForm only supports Iceberg. When `delta.universalFormat.enabledFormats` contains + * "iceberg", we say that Universal Format (Iceberg) is enabled. + * + * [[enforceIcebergInvariantsAndDependencies]] ensures that all of UniForm (Iceberg)'s requirements + * are met (i.e. that IcebergCompatV1 is enabled). It doesn't verify that its nested requirements + * are met (i.e. IcebergCompatV1's requirements, like Column Mapping). That is the responsibility of + * [[IcebergCompatV1.enforceInvariantsAndDependencies]]. + * + * + * Note that UniForm (Iceberg) depends on IcebergCompatV1, but IcebergCompatV1 does not depend on or + * require UniForm (Iceberg). It is perfectly valid for a Delta table to have IcebergCompatV1 + * enabled but UniForm (Iceberg) not enabled. + */ +object UniversalFormat extends DeltaLogging { + + val ICEBERG_FORMAT = "iceberg" + val SUPPORTED_FORMATS = Set(ICEBERG_FORMAT) + + def icebergEnabled(metadata: Metadata): Boolean = { + DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.fromMetaData(metadata).contains(ICEBERG_FORMAT) + } + + def icebergEnabled(properties: Map[String, String]): Boolean = { + properties.get(DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.key) + .exists(value => value.contains(ICEBERG_FORMAT)) + } + + /** + * Expected to be called after the newest metadata and protocol have been ~ finalized. + * + * Furthermore, this should be called *before* + * [[IcebergCompatV1.enforceInvariantsAndDependencies]]. + * + * If you are enabling Universal Format (Iceberg), this method ensures that IcebergCompatV1 is + * supported and enabled. If this is a new table, IcebergCompatV1 will be automatically enabled. + * + * If you are disabling Universal Format (Iceberg), this method ensures that IcebergCompatV1 is + * disabled. It may still be supported, however. + * + * @return tuple of options of (updatedProtocol, updatedMetadata). For either action, if no + * updates need to be applied, will return None. + */ + def enforceIcebergInvariantsAndDependencies( + prevProtocol: Protocol, + prevMetadata: Metadata, + newestProtocol: Protocol, + newestMetadata: Metadata, + isCreatingNewTable: Boolean): (Option[Protocol], Option[Metadata]) = { + val uniformIcebergWasEnabled = UniversalFormat.icebergEnabled(prevMetadata) + val uniformIcebergIsEnabled = UniversalFormat.icebergEnabled(newestMetadata) + val tableId = newestMetadata.id + + (uniformIcebergWasEnabled, uniformIcebergIsEnabled) match { + case (false, false) => (None, None) // Ignore + case (true, false) => // Disabling! + if (!IcebergCompatV1.isEnabled(newestMetadata)) { + (None, None) + } else { + logInfo(s"[tableId=$tableId] Universal Format (Iceberg): This feature is being " + + "disabled. Auto-disabling IcebergCompatV1, too.") + + val newConfiguration = newestMetadata.configuration ++ + Map(DeltaConfigs.ICEBERG_COMPAT_V1_ENABLED.key -> "false") + + (None, Some(newestMetadata.copy(configuration = newConfiguration))) + } + case (_, true) => // Enabling now or already-enabled + val icebergCompatV1WasEnabled = IcebergCompatV1.isEnabled(prevMetadata) + val icebergCompatV1IsEnabled = IcebergCompatV1.isEnabled(newestMetadata) + + if (icebergCompatV1IsEnabled) { + (None, None) + } else if (isCreatingNewTable) { + // We need to handle the isCreatingNewTable case first because in the the case of + // a REPLACE TABLE, it could be that icebergCompatV1IsEnabled is false, if it + // is not explicitly specified as part of the REPLACE command, but + // icebergCompatV1WasEnabled is true, if it was set on the previous table. In this + // case, we do not want to auto disable Uniform but rather set its dependencies + // automatically, the same way as is done for CREATE. + logInfo(s"[tableId=$tableId] Universal Format (Iceberg): Creating a new table " + + s"with Universal Format (Iceberg) enabled, but IcebergCompatV1 is not yet enabled. " + + s"Auto-supporting and enabling IcebergCompatV1 now.") + val protocolResult = Some( + newestProtocol.merge(Protocol.forTableFeature(IcebergCompatV1TableFeature)) + ) + val metadataResult = Some( + newestMetadata.copy( + configuration = newestMetadata.configuration ++ + Map(DeltaConfigs.ICEBERG_COMPAT_V1_ENABLED.key -> "true") + ) + ) + + (protocolResult, metadataResult) + } else if (icebergCompatV1WasEnabled) { + // IcebergCompatV1 is being disabled. We need to also disable Universal Format (Iceberg) + val remainingSupportedFormats = DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS + .fromMetaData(newestMetadata) + .filterNot(_ == UniversalFormat.ICEBERG_FORMAT) + + val newConfiguration = if (remainingSupportedFormats.isEmpty) { + newestMetadata.configuration - DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.key + } else { + newestMetadata.configuration ++ + Map(DeltaConfigs.UNIVERSAL_FORMAT_ENABLED_FORMATS.key -> + remainingSupportedFormats.mkString(",")) + } + + logInfo(s"[tableId=$tableId] IcebergCompatV1 is being disabled. Auto-disabling " + + "Universal Format (Iceberg), too.") + + (None, Some(newestMetadata.copy(configuration = newConfiguration))) + } else { + throw DeltaErrors.uniFormIcebergRequiresIcebergCompatV1() + } + } + } + +} + +/** Class to facilitate the conversion of Delta into other table formats. */ +abstract class UniversalFormatConverter(spark: SparkSession) { + /** + * Perform an asynchronous conversion. + * + * This will start an async job to run the conversion, unless there already is an async conversion + * running for this table. In that case, it will queue up the provided snapshot to be run after + * the existing job completes. + */ + def enqueueSnapshotForConversion( + snapshotToConvert: Snapshot, + txn: Option[OptimisticTransactionImpl]): Unit + + /** Perform a blocking conversion. */ + def convertSnapshot( + snapshotToConvert: Snapshot, + txnOpt: Option[OptimisticTransactionImpl]): Option[(Long, Long)] + + def loadLastDeltaVersionConverted(snapshot: Snapshot): Option[Long] +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index 76f07ac7d85..2f27d52efd8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.DeltaColumnMapping.{dropColumnMappingMetadata, filterColumnMappingProperties} import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol} import org.apache.spark.sql.delta.actions.DomainMetadata +import org.apache.spark.sql.delta.hooks.IcebergConverterHook import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -109,6 +110,11 @@ case class CreateDeltaTableCommand( recordDeltaOperation(deltaLog, "delta.ddl.createTable") { val txn = deltaLog.startTransaction() + + // During CREATE/REPLACE, we synchronously run conversion (if Uniform is enabled) so + // we always remove the post commit hook here. + txn.unregisterPostCommitHooksWhere(hook => hook.name == IcebergConverterHook.name) + val opStartTs = System.currentTimeMillis() if (query.isDefined) { // If the mode is Ignore or ErrorIfExists, the table must not exist, or we would return @@ -269,6 +275,10 @@ case class CreateDeltaTableCommand( updateCatalog(sparkSession, tableWithLocation, snapshot, txn) + if (UniversalFormat.icebergEnabled(snapshot.metadata)) { + deltaLog.icebergConverter.convertSnapshot(snapshot, None) + } + result } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/IcebergConverterHook.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/IcebergConverterHook.scala new file mode 100644 index 00000000000..aca8907b711 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/IcebergConverterHook.scala @@ -0,0 +1,51 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.hooks + +import org.apache.spark.sql.delta.{OptimisticTransactionImpl, Snapshot, UniversalFormat} +import org.apache.spark.sql.delta.actions.Action +import org.apache.spark.sql.delta.metering.DeltaLogging + +import org.apache.spark.sql.SparkSession + +/** Write a new Iceberg metadata file at the version committed by the txn, if required. */ +object IcebergConverterHook extends PostCommitHook with DeltaLogging { + override val name: String = "Post-commit Iceberg metadata conversion" + + val ASYNC_ICEBERG_CONVERTER_THREAD_NAME = "async-iceberg-converter" + + override def run( + spark: SparkSession, + txn: OptimisticTransactionImpl, + committedVersion: Long, + postCommitSnapshot: Snapshot, + committedActions: Seq[Action]): Unit = { + // Only convert to Iceberg if the snapshot matches the version committed. + // This is to skip converting the same actions multiple times - they'll be written out + // by another commit anyways. + if (committedVersion != postCommitSnapshot.version || + !UniversalFormat.icebergEnabled(postCommitSnapshot.metadata)) { + return + } + + + postCommitSnapshot + .deltaLog + .icebergConverter + .enqueueSnapshotForConversion(postCommitSnapshot, Some(txn)) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index f6a290c94ee..38844d84fc3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1205,6 +1205,24 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val ICEBERG_MAX_COMMITS_TO_CONVERT = buildConf("iceberg.maxPendingCommits") + .doc(""" + |The maximum number of pending Delta commits to convert to Iceberg incrementally. + |If the table hasn't been converted to Iceberg in longer than this number of commits, + |we start from scratch, replacing the previously converted Iceberg table contents. + |""".stripMargin) + .intConf + .createWithDefault(100) + + val ICEBERG_MAX_ACTIONS_TO_CONVERT = buildConf("iceberg.maxPendingActions") + .doc(""" + |The maximum number of pending Delta actions to convert to Iceberg incrementally. + |If there are more than this number of outstanding actions, chunk them into separate + |Iceberg commits. + |""".stripMargin) + .intConf + .createWithDefault(100 * 1000) + } object DeltaSQLConf extends DeltaSQLConfBase diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaThreadPool.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaThreadPool.scala index 51d3a544fe8..4480937da53 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaThreadPool.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaThreadPool.scala @@ -31,6 +31,19 @@ private[delta] class DeltaThreadPool(tpe: ThreadPoolExecutor) { /** Submits a task for execution and returns a [[Future]] representing that task. */ def submit[T](spark: SparkSession)(body: => T): Future[T] = Future[T](spark.withActive(body)) + + /** + * Executes `f` on each element of `items` as a task and returns the result. + * Throws a [[SparkException]] if a timeout occurs. + */ + def parallelMap[T, R]( + spark: SparkSession, + items: Iterable[T], + timeout: Duration = Duration.Inf)( + f: T => R): Iterable[R] = { + val futures = items.map(i => submit(spark)(f(i))) + ThreadUtils.awaitResult(Future.sequence(futures), timeout) + } }