diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala index 4c2949c8565..1f05e13ca4d 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.icebergShaded.IcebergTransactionUtils._ import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction} @@ -37,6 +38,7 @@ import shadedForDelta.org.apache.iceberg.mapping.MappingUtil import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser import org.apache.spark.internal.MDC +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable sealed trait IcebergTableOp @@ -56,6 +58,7 @@ case object REPLACE_TABLE extends IcebergTableOp * @param lastConvertedDeltaVersion the delta version this Iceberg txn starts from. */ class IcebergConversionTransaction( + protected val spark: SparkSession, protected val catalogTable: CatalogTable, protected val conf: Configuration, protected val postCommitSnapshot: Snapshot, @@ -342,11 +345,18 @@ class IcebergConversionTransaction( // possible legitimate Delta version which is 0. val deltaVersion = if (tableOp == CREATE_TABLE) -1 else postCommitSnapshot.version - txn.updateProperties() - .set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString) + var updateTxn = txn.updateProperties() + updateTxn = updateTxn.set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString) .set(IcebergConverter.DELTA_TIMESTAMP_PROPERTY, postCommitSnapshot.timestamp.toString) .set(IcebergConstants.ICEBERG_NAME_MAPPING_PROPERTY, nameMapping) - .commit() + + if (spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_UNIFORM_ICEBERG_INCLUDE_BASE_CONVERTED_VERSION)) { + lastConvertedDeltaVersion.foreach { v => + updateTxn = updateTxn.set(IcebergConverter.BASE_DELTA_VERSION_PROPERTY, v.toString) + } + } + updateTxn.commit() // We ensure the iceberg txns are serializable by only allowing them to commit against // lastConvertedIcebergSnapshotId. diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala index de432f48303..ec4385336ab 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs.Path import shadedForDelta.org.apache.iceberg.{Table => IcebergTable} +import shadedForDelta.org.apache.iceberg.exceptions.CommitFailedException import shadedForDelta.org.apache.iceberg.hive.{HiveCatalog, HiveTableOperations} import org.apache.spark.internal.MDC @@ -54,8 +55,17 @@ object IcebergConverter { */ val DELTA_TIMESTAMP_PROPERTY = "delta-timestamp" + /** + * Property to be set in translated Iceberg metadata files. + * Indicates the base delta commit version # that the conversion started from + */ + val BASE_DELTA_VERSION_PROPERTY = "base-delta-version" + def getLastConvertedDeltaVersion(table: Option[IcebergTable]): Option[Long] = table.flatMap(_.properties().asScala.get(DELTA_VERSION_PROPERTY)).map(_.toLong) + + def getLastConvertedDeltaTimestamp(table: Option[IcebergTable]): Option[Long] = + table.flatMap(_.properties().asScala.get(DELTA_TIMESTAMP_PROPERTY)).map(_.toLong) } /** @@ -177,7 +187,7 @@ class IcebergConverter(spark: SparkSession) override def convertSnapshot( snapshotToConvert: Snapshot, catalogTable: CatalogTable): Option[(Long, Long)] = { try { - convertSnapshot(snapshotToConvert, None, catalogTable) + convertSnapshotWithRetry(snapshotToConvert, None, catalogTable) } catch { case NonFatal(e) => logError(log"Error when converting to Iceberg metadata", e) @@ -205,7 +215,7 @@ class IcebergConverter(spark: SparkSession) snapshotToConvert: Snapshot, txn: OptimisticTransactionImpl): Option[(Long, Long)] = { try { txn.catalogTable match { - case Some(table) => convertSnapshot(snapshotToConvert, Some(txn), table) + case Some(table) => convertSnapshotWithRetry(snapshotToConvert, Some(txn), table) case _ => val msg = s"CatalogTable for table ${snapshotToConvert.deltaLog.tableId} " + s"is empty in txn. Skip iceberg conversion." @@ -227,6 +237,41 @@ class IcebergConverter(spark: SparkSession) } } + /** + * Convert the specified snapshot into Iceberg with retry + */ + private def convertSnapshotWithRetry( + snapshotToConvert: Snapshot, + txnOpt: Option[OptimisticTransactionImpl], + catalogTable: CatalogTable, + maxRetry: Int = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_UNIFORM_ICEBERG_RETRY_TIMES) + ): Option[(Long, Long)] = { + var retryAttempt = 0 + while (retryAttempt < maxRetry) { + try { + return convertSnapshot(snapshotToConvert, txnOpt, catalogTable) + } catch { + case e: CommitFailedException if retryAttempt < maxRetry => + retryAttempt += 1 + val lastConvertedIcebergTable = loadIcebergTable(snapshotToConvert, catalogTable) + val lastDeltaVersionConverted = IcebergConverter + .getLastConvertedDeltaVersion(lastConvertedIcebergTable) + val lastConvertedDeltaTimestamp = IcebergConverter + .getLastConvertedDeltaTimestamp(lastConvertedIcebergTable) + // Do not retry if the current or higher Delta version is already converted + (lastDeltaVersionConverted, lastConvertedDeltaTimestamp) match { + case (Some(version), Some(timestamp)) if version >= snapshotToConvert.version => + return Some(version, timestamp) + case _ => + logWarning(s"CommitFailedException when converting to Iceberg metadata;" + + s" retry count $retryAttempt", e) + } + } + } + throw new IllegalStateException("should not happen") + } + /** * Convert the specified snapshot into Iceberg. NOTE: This operation is blocking. Call * enqueueSnapshotForConversion to run the operation asynchronously. @@ -288,7 +333,7 @@ class IcebergConverter(spark: SparkSession) } val icebergTxn = new IcebergConversionTransaction( - cleanedCatalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, + spark, cleanedCatalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastConvertedIcebergSnapshotId, lastDeltaVersionConverted) // Write out the actions taken since the last conversion (or since table creation). diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index e8d3e07fecc..5415096ab1e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1412,6 +1412,22 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val DELTA_UNIFORM_ICEBERG_RETRY_TIMES = + buildConf("uniform.iceberg.retry.times") + .doc("The number of retries iceberg conversions should have in case " + + "of failures") + .internal() + .intConf + .createWithDefault(3) + + val DELTA_UNIFORM_ICEBERG_INCLUDE_BASE_CONVERTED_VERSION = + buildConf("uniform.iceberg.include.base.converted.version") + .doc("If true, include the base converted delta version as a tbl property in Iceberg " + + "metadata to indicate the delta version that the conversion started from") + .internal() + .booleanConf + .createWithDefault(true) + val DELTA_OPTIMIZE_MIN_FILE_SIZE = buildConf("optimize.minFileSize") .internal()