Skip to content

Commit

Permalink
Fix metadata updates via actions on first commit.
Browse files Browse the repository at this point in the history
Fixed an issue where you couldn't set the partition scheme on the initial commit via `txn.commit` but doing it via `txn.updateMetadata` followed by `txn.commit` worked fine, even though both submitted the same metadata during the same transaction.

GitOrigin-RevId: 804baa968acbc8633cf3f25639331a41cd0a81e9
  • Loading branch information
larsk-db authored and allisonport-db committed Nov 18, 2022
1 parent e60d8b6 commit 951a97d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
/** Tracks if this transaction has already committed. */
protected var committed = false

/** Stores the updated metadata (if any) that will result from this txn. */
/**
* Stores the updated metadata (if any) that will result from this txn.
*
* This is just one way to change metadata.
* New metadata can also be added during commit from actions.
* But metadata should *not* be updated via both paths.
*/
protected var newMetadata: Option[Metadata] = None

/** Stores the updated protocol (if any) that will result from this txn. */
Expand Down Expand Up @@ -935,6 +941,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
assert(!committed, "Transaction already committed.")

// If the metadata has changed, add that to the set of actions
// New metadata can come either from `newMetadata` or from the `actions` there.
var finalActions = newMetadata.toSeq ++ actions
val metadataChanges = finalActions.collect { case m: Metadata => m }
if (metadataChanges.length > 1) {
Expand All @@ -944,7 +951,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
assert(
metadataChanges.length <= 1, "Cannot change the metadata more than once in a transaction.")
}
metadataChanges.foreach(m => verifyNewMetadata(m))
// There be at most one metadata entry at this point.
metadataChanges.foreach { m =>
verifyNewMetadata(m)
// Also update `newMetadata` so that the behaviour later is consistent irrespective of whether
// metadata was set via `updateMetadata` or `actions`.
newMetadata = Some(m)
}
finalActions = newProtocol.toSeq ++ finalActions


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,4 +431,31 @@ class OptimisticTransactionSuite
assert(testTxn.preCommitLogSegment.checkpointVersionOpt == Some(10))
}
}

test("can set partition columns in first commit") {
withTempDir { tableDir =>
val partitionColumns = Array("part")
val exampleAddFile = AddFile(
path = "test-path",
partitionValues = Map("part" -> "one"),
size = 1234,
modificationTime = 5678,
dataChange = true,
stats = """{"numRecords": 1}""",
tags = Map.empty)
val deltaLog = DeltaLog.forTable(spark, tableDir)
val schema = new StructType()
.add("id", "long")
.add("part", "string")
deltaLog.withNewTransaction { txn =>
val protocol = Protocol()
val metadata = Metadata(
schemaString = schema.json,
partitionColumns = partitionColumns)
txn.commit(Seq(protocol, metadata, exampleAddFile), DeltaOperations.ManualUpdate)
}
val snapshot = deltaLog.update()
assert(snapshot.metadata.partitionColumns.sameElements(partitionColumns))
}
}
}

0 comments on commit 951a97d

Please # to comment.