Skip to content

Commit

Permalink
Add more type widening tests
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed May 7, 2024
1 parent 7f199fe commit d5ed864
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.file.FileAlreadyExistsException
import java.util.{ConcurrentModificationException, UUID}
import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -238,7 +239,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
* Tracks the data that could have been seen by recording the partition
* predicates by which files have been queried by this transaction.
*/
protected val readPredicates = new ArrayBuffer[DeltaTableReadPredicate]
protected val readPredicates =
new java.util.concurrent.ConcurrentLinkedQueue[DeltaTableReadPredicate]

/** Tracks specific files that have been seen by this transaction. */
protected val readFiles = new HashSet[AddFile]
Expand Down Expand Up @@ -884,10 +886,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
}

readPredicates += DeltaTableReadPredicate(
readPredicates.add(DeltaTableReadPredicate(
partitionPredicates = partitionFilters,
dataPredicates = dataFilters,
shouldRewriteFilter = shouldRewriteFilter)
)
}

/**
Expand Down Expand Up @@ -1127,7 +1130,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
DomainMetadataUtils.validateDomainMetadataSupportedAndNoDuplicate(finalActions, protocol)

isBlindAppend = {
val dependsOnFiles = readPredicates.nonEmpty || readFiles.nonEmpty
val dependsOnFiles = !readPredicates.isEmpty || readFiles.nonEmpty
val onlyAddFiles =
preparedActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])
onlyAddFiles && !dependsOnFiles
Expand Down Expand Up @@ -1163,7 +1166,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
val currentTransactionInfo = CurrentTransactionInfo(
txnId = txnId,
readPredicates = readPredicates.toSeq,
readPredicates = readPredicates.asScala.toSeq,
readFiles = readFiles.toSet,
readWholeTable = readTheWholeTable,
readAppIds = readTxn.toSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,7 @@ case class CommitInfo(
// infer the commit version from the file name and fill in this field then.
@JsonDeserialize(contentAs = classOf[java.lang.Long])
version: Option[Long],
@JsonDeserialize(contentAs = classOf[java.lang.Long])
inCommitTimestamp: Option[Long],
timestamp: Timestamp,
userId: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ trait DeltaLogging
: Unit = {
if (Utils.isTesting) {
assert(check, msg)
} else {
} else if (!check) {
recordDeltaEvent(
deltaLog = deltaLog,
opType = s"delta.assertions.$name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,34 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
assert(Action.fromJson(json1) === expectedCommitInfo)
}

test("deserialization of CommitInfo with a very small ICT") {
val json1 =
"""{"commitInfo":{"inCommitTimestamp":123,"timestamp":123,"operation":"CONVERT",""" +
""""operationParameters":{},"readVersion":23,""" +
""""isolationLevel":"SnapshotIsolation","isBlindAppend":true,""" +
""""operationMetrics":{"m1":"v1","m2":"v2"},"userMetadata":"123"}}""".stripMargin
assert(Action.fromJson(json1).asInstanceOf[CommitInfo].inCommitTimestamp.get == 123L)
}

test("deserialization of CommitInfo with a very large ICT") {
val json1 =
"""{"commitInfo":{"inCommitTimestamp":123333333,"timestamp":123,"operation":"CONVERT",""" +
""""operationParameters":{},"readVersion":23,""" +
""""isolationLevel":"SnapshotIsolation","isBlindAppend":true,""" +
""""operationMetrics":{"m1":"v1","m2":"v2"},"userMetadata":"123"}}""".stripMargin
assert(Action.fromJson(json1).asInstanceOf[CommitInfo].inCommitTimestamp.get == 123333333L)
}

test("deserialization of CommitInfo with missing ICT") {
val json1 =
"""{"commitInfo":{"timestamp":123,"operation":"CONVERT",""" +
""""operationParameters":{},"readVersion":23,""" +
""""isolationLevel":"SnapshotIsolation","isBlindAppend":true,""" +
""""operationMetrics":{"m1":"v1","m2":"v2"},"userMetadata":"123"}}""".stripMargin
val ictOpt: Option[Long] = Action.fromJson(json1).asInstanceOf[CommitInfo].inCommitTimestamp
assert(ictOpt.isEmpty)
}

testActionSerDe(
"Protocol - json serialization/deserialization",
Protocol(minReaderVersion = 1, minWriterVersion = 2),
Expand Down
Loading

0 comments on commit d5ed864

Please # to comment.