Skip to content

Commit

Permalink
KAFKA-18691: Flaky test testFencingOnTransactionExpiration (#18793)
Browse files Browse the repository at this point in the history
It appears this test was failing because the transaction was never aborting and the concurrent transactions errors would not go away.

ccab9eb introduced the test failure because it requires the transaction to complete, but I suspect the lack of completion was happening before the change.

The timeout for the write is based on the transactional timeout, and 100ms seemed too small -- thus the requests to update the state would often repeatedly time out.

Also removed the loop since it was not necessary.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Calvin Liu <caliu@confluent.io>
  • Loading branch information
jolshan authored Feb 4, 2025
1 parent b998189 commit 822b8ab
Showing 1 changed file with 11 additions and 15 deletions.
26 changes: 11 additions & 15 deletions core/src/test/scala/integration/kafka/api/TransactionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ class TransactionsTest extends IntegrationTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testFencingOnTransactionExpiration(quorum: String, groupProtocol: String): Unit = {
val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 100)
val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 300)

producer.initTransactions()
producer.beginTransaction()
Expand All @@ -617,20 +617,16 @@ class TransactionsTest extends IntegrationTestHarness {
// Wait for the expiration cycle to kick in.
Thread.sleep(600)

TestUtils.waitUntilTrue(() => {
var foundException = false
try {
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException. We may see some concurrentTransactionsExceptions.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised an error due to concurrent transactions or invalid producer epoch")
} catch {
case _: ConcurrentTransactionsException =>
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
foundException = e.getCause.isInstanceOf[InvalidProducerEpochException]
}
foundException
}, "Never returned the expected InvalidProducerEpochException")
try {
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException. We may see some concurrentTransactionsExceptions.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised an error due to concurrent transactions or invalid producer epoch")
} catch {
case _: ConcurrentTransactionsException =>
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException], "Error was " + e.getCause + " and not InvalidProducerEpochException")
}

// Verify that the first message was aborted and the second one was never written at all.
val nonTransactionalConsumer = nonTransactionalConsumers.head
Expand Down

0 comments on commit 822b8ab

Please # to comment.