From 4689ae81e399b50f6b71633e3f6ce99e579ea534 Mon Sep 17 00:00:00 2001 From: Hideaki Oguni <22386882+izumo27@users.noreply.github.com> Date: Sun, 21 Jul 2024 19:17:20 +0900 Subject: [PATCH 1/2] fix test of nack by messageid --- .../apache/pulsar/client/impl/NegativeAcksTest.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index a6b77a1c72775..a41b7f05a8eb3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -134,7 +134,7 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti Set sentMessages = new HashSet<>(); final int N = 10; - for (int i = 0; i < N; i++) { + for (int i = 0; i < N * 2; i++) { String value = "test-" + i; producer.sendAsync(value); sentMessages.add(value); @@ -146,13 +146,18 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti consumer.negativeAcknowledge(msg); } + for (int i = 0; i < N; i++) { + Message msg = consumer.receive(); + consumer.negativeAcknowledge(msg.getMessageId()); + } + assertTrue(consumer instanceof ConsumerBase); assertEquals(((ConsumerBase) consumer).getUnAckedMessageTracker().size(), 0); Set receivedMessages = new HashSet<>(); // All the messages should be received again - for (int i = 0; i < N; i++) { + for (int i = 0; i < N * 2; i++) { Message msg = consumer.receive(); receivedMessages.add(msg.getValue()); consumer.acknowledge(msg); @@ -310,9 +315,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { assertEquals(unAckedMessageTracker.size(), 0); negativeAcksTracker.close(); // negative batch message id - unAckedMessageTracker.add(batchMessageId); - unAckedMessageTracker.add(batchMessageId2); - unAckedMessageTracker.add(batchMessageId3); + unAckedMessageTracker.add(messageId); consumer.negativeAcknowledge(batchMessageId); consumer.negativeAcknowledge(batchMessageId2); consumer.negativeAcknowledge(batchMessageId3); From 8ad4d2e70a8316870080e954074b3fcc6dd937f1 Mon Sep 17 00:00:00 2001 From: Hideaki Oguni <22386882+izumo27@users.noreply.github.com> Date: Mon, 22 Jul 2024 10:24:45 +0900 Subject: [PATCH 2/2] fix nack by messageid --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6ddb0e1bc01db..1806d13493b2f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -811,7 +811,7 @@ public void negativeAcknowledge(MessageId messageId) { negativeAcksTracker.add(messageId); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" - unAckedMessageTracker.remove(messageId); + unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId)); } @Override