Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

KAFKA-18691: Flaky test testFencingOnTransactionExpiration #18793

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions core/src/test/scala/integration/kafka/api/TransactionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ class TransactionsTest extends IntegrationTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testFencingOnTransactionExpiration(quorum: String, groupProtocol: String): Unit = {
val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 100)
val producer = createTransactionalProducer("expiringProducer", transactionTimeoutMs = 300)

producer.initTransactions()
producer.beginTransaction()
Expand All @@ -617,20 +617,16 @@ class TransactionsTest extends IntegrationTestHarness {
// Wait for the expiration cycle to kick in.
Thread.sleep(600)

TestUtils.waitUntilTrue(() => {
var foundException = false
try {
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException. We may see some concurrentTransactionsExceptions.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised an error due to concurrent transactions or invalid producer epoch")
} catch {
case _: ConcurrentTransactionsException =>
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
foundException = e.getCause.isInstanceOf[InvalidProducerEpochException]
}
foundException
}, "Never returned the expected InvalidProducerEpochException")
try {
// Now that the transaction has expired, the second send should fail with a InvalidProducerEpochException. We may see some concurrentTransactionsExceptions.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "2", "2", willBeCommitted = false)).get()
fail("should have raised an error due to concurrent transactions or invalid producer epoch")
} catch {
case _: ConcurrentTransactionsException =>
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException], "Error was " + e.getCause + " and not InvalidProducerEpochException")
}

// Verify that the first message was aborted and the second one was never written at all.
val nonTransactionalConsumer = nonTransactionalConsumers.head
Expand Down