From bbef1e6c5a925c76167eef7152061c010a7b0e28 Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 14 Dec 2023 13:35:57 +0800 Subject: [PATCH] Improve test --- .../broker/transaction/TransactionTest.java | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index f26c39fc52cbe..1cf0c48b1257a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1871,7 +1871,15 @@ public void testReadCommittedWithCompaction() throws Exception{ .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); producer.newMessage(txn).key("K2").value("V2").send(); producer.newMessage(txn).key("K3").value("V3").send(); - txn.commit(); + txn.commit().get(); + + producer.newMessage().key("K1").value("V4").send(); + + Transaction txn2 = pulsarClient.newTransaction() + .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + producer.newMessage(txn2).key("K2").value("V5").send(); + producer.newMessage(txn2).key("K3").value("V6").send(); + txn2.commit().get(); admin.topics().triggerCompaction(topic); @@ -1879,6 +1887,25 @@ public void testReadCommittedWithCompaction() throws Exception{ assertEquals(admin.topics().compactionStatus(topic).status, LongRunningProcessStatus.Status.SUCCESS); }); + + @Cleanup + Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Exclusive) + .readCompacted(true) + .subscribe(); + List result = new ArrayList<>(); + while (true) { + Message receive = consumer.receive(2, TimeUnit.SECONDS); + if (receive == null) { + break; + } + + result.add(receive.getValue()); + } + + Assert.assertEquals(result, List.of("V4", "V5", "V6")); } }