From 951a97d3705939d473967b7f1c97c99f4472f755 Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Wed, 9 Nov 2022 20:04:33 +0100 Subject: [PATCH] Fix metadata updates via actions on first commit. Fixed an issue where you couldn't set the partition scheme on the initial commit via `txn.commit` but doing it via `txn.updateMetadata` followed by `txn.commit` worked fine, even though both submitted the same metadata during the same transaction. GitOrigin-RevId: 804baa968acbc8633cf3f25639331a41cd0a81e9 --- .../sql/delta/OptimisticTransaction.scala | 17 ++++++++++-- .../delta/OptimisticTransactionSuite.scala | 27 +++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 282e7b2b98d..9c59e50529e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -201,7 +201,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite /** Tracks if this transaction has already committed. */ protected var committed = false - /** Stores the updated metadata (if any) that will result from this txn. */ + /** + * Stores the updated metadata (if any) that will result from this txn. + * + * This is just one way to change metadata. + * New metadata can also be added during commit from actions. + * But metadata should *not* be updated via both paths. + */ protected var newMetadata: Option[Metadata] = None /** Stores the updated protocol (if any) that will result from this txn. */ @@ -935,6 +941,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite assert(!committed, "Transaction already committed.") // If the metadata has changed, add that to the set of actions + // New metadata can come either from `newMetadata` or from the `actions` there. var finalActions = newMetadata.toSeq ++ actions val metadataChanges = finalActions.collect { case m: Metadata => m } if (metadataChanges.length > 1) { @@ -944,7 +951,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite assert( metadataChanges.length <= 1, "Cannot change the metadata more than once in a transaction.") } - metadataChanges.foreach(m => verifyNewMetadata(m)) + // There be at most one metadata entry at this point. + metadataChanges.foreach { m => + verifyNewMetadata(m) + // Also update `newMetadata` so that the behaviour later is consistent irrespective of whether + // metadata was set via `updateMetadata` or `actions`. + newMetadata = Some(m) + } finalActions = newProtocol.toSeq ++ finalActions diff --git a/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index 2d86e00d195..a0dd2a50b2d 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -431,4 +431,31 @@ class OptimisticTransactionSuite assert(testTxn.preCommitLogSegment.checkpointVersionOpt == Some(10)) } } + + test("can set partition columns in first commit") { + withTempDir { tableDir => + val partitionColumns = Array("part") + val exampleAddFile = AddFile( + path = "test-path", + partitionValues = Map("part" -> "one"), + size = 1234, + modificationTime = 5678, + dataChange = true, + stats = """{"numRecords": 1}""", + tags = Map.empty) + val deltaLog = DeltaLog.forTable(spark, tableDir) + val schema = new StructType() + .add("id", "long") + .add("part", "string") + deltaLog.withNewTransaction { txn => + val protocol = Protocol() + val metadata = Metadata( + schemaString = schema.json, + partitionColumns = partitionColumns) + txn.commit(Seq(protocol, metadata, exampleAddFile), DeltaOperations.ManualUpdate) + } + val snapshot = deltaLog.update() + assert(snapshot.metadata.partitionColumns.sameElements(partitionColumns)) + } + } }