From dd022fa07ffac5872a47befbe372e86464b491f5 Mon Sep 17 00:00:00 2001 From: Hideaki Oguni <22386882+izumo27@users.noreply.github.com> Date: Mon, 29 Jul 2024 16:29:59 +0900 Subject: [PATCH] [fix][client] Fix negative acknowledgement by messageId (#23060) (cherry picked from commit d4bbf10f58771e2d43e576dc3422e502834b1de4) (cherry picked from commit 02f3ecc1d20cda17edd4c308f401b3c15463753c) --- .../apache/pulsar/client/impl/NegativeAcksTest.java | 13 ++++++++----- .../org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index a6b77a1c72775..09b9b8b2fabdb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -134,7 +134,7 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti Set sentMessages = new HashSet<>(); final int N = 10; - for (int i = 0; i < N; i++) { + for (int i = 0; i < N * 2; i++) { String value = "test-" + i; producer.sendAsync(value); sentMessages.add(value); @@ -145,6 +145,11 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti Message msg = consumer.receive(); consumer.negativeAcknowledge(msg); } + for (int i = 0; i < N; i++) { + Message msg = consumer.receive(); + consumer.negativeAcknowledge(msg.getMessageId()); + } + assertTrue(consumer instanceof ConsumerBase); assertEquals(((ConsumerBase) consumer).getUnAckedMessageTracker().size(), 0); @@ -152,7 +157,7 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti Set receivedMessages = new HashSet<>(); // All the messages should be received again - for (int i = 0; i < N; i++) { + for (int i = 0; i < N * 2; i++) { Message msg = consumer.receive(); receivedMessages.add(msg.getValue()); consumer.acknowledge(msg); @@ -310,9 +315,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { assertEquals(unAckedMessageTracker.size(), 0); negativeAcksTracker.close(); // negative batch message id - unAckedMessageTracker.add(batchMessageId); - unAckedMessageTracker.add(batchMessageId2); - unAckedMessageTracker.add(batchMessageId3); + unAckedMessageTracker.add(messageId); consumer.negativeAcknowledge(batchMessageId); consumer.negativeAcknowledge(batchMessageId2); consumer.negativeAcknowledge(batchMessageId3); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 75326336b4446..844f7f54a62a8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -762,7 +762,7 @@ public void negativeAcknowledge(MessageId messageId) { negativeAcksTracker.add(messageId); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" - unAckedMessageTracker.remove(messageId); + unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId)); } @Override