From 2ab5a8d419d869525b1c110372a7782875caa88b Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Wed, 28 Aug 2024 10:37:39 -0700 Subject: [PATCH] Uniform iceberg conversion transaction should not convert commit with only AddFiles without datachange (#3615) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Uniform iceberg conversion transaction should not convert commit with only AddFiles without datachange. Otherwise it will result in duplicate AddFile in Iceberg. ## How was this patch tested? Manual tested with Spark; UT will be added in future PR. ## Does this PR introduce _any_ user-facing changes? --- .../delta/icebergShaded/IcebergConverter.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala index a4fc84e7cff..cd6eb29a31c 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala @@ -467,6 +467,7 @@ class IcebergConverter(spark: SparkSession) var hasRemoves = false var hasDataChange = false var hasCommitInfo = false + var commitInfo: Option[CommitInfo] = None breakable { for (action <- actionsToCommit) { action match { @@ -476,7 +477,9 @@ class IcebergConverter(spark: SparkSession) case r: RemoveFile => hasRemoves = true if (r.dataChange) hasDataChange = true - case _: CommitInfo => hasCommitInfo = true + case ci: CommitInfo => + commitInfo = Some(ci) + hasCommitInfo = true case _ => // Do nothing } if (hasAdds && hasRemoves && hasDataChange && hasCommitInfo) break // Short-circuit @@ -510,9 +513,14 @@ class IcebergConverter(spark: SparkSession) } overwriteHelper.commit() } else if (hasAdds) { - val appendHelper = icebergTxn.getAppendOnlyHelper() - addsAndRemoves.foreach(action => appendHelper.add(action.add)) - appendHelper.commit() + if (!hasRemoves && !hasDataChange && allDeltaActionsCaptured) { + logInfo(s"Skip Iceberg conversion for commit that only has AddFiles " + + s"without any RemoveFiles or data change. CommitInfo: $commitInfo") + } else { + val appendHelper = icebergTxn.getAppendOnlyHelper() + addsAndRemoves.foreach(action => appendHelper.add(action.add)) + appendHelper.commit() + } } else if (hasRemoves) { val removeHelper = icebergTxn.getRemoveOnlyHelper() addsAndRemoves.foreach(action => removeHelper.remove(action.remove))