From a222853fbe612500355c6f56488c1a4d542dfedc Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Apr 2024 20:51:29 +0800 Subject: [PATCH 1/3] [fix] [broker] Fix NPE which will leading dispatching stuck --- ...tStickyKeyDispatcherMultipleConsumers.java | 3 +++ .../client/api/KeySharedSubscriptionTest.java | 19 +++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index ee2ebd7ca867e..b0203589b3b4b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -501,6 +501,9 @@ protected synchronized NavigableSet filterOutEntriesWillBeDiscarde */ @Override protected boolean hasConsumersNeededNormalRead() { + if (MapUtils.isEmpty(recentlyJoinedConsumers)) { + return getFirstAvailableConsumerPermits() > 0; + } for (Consumer consumer : consumerList) { if (consumer == null || consumer.isBlocked()) { continue; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 7219555050839..3ca90a75b310d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -61,6 +61,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -1741,6 +1742,14 @@ public void testNoRepeatedReadAndDiscard() throws Exception { admin.topics().delete(topic, false); } + @DataProvider(name = "allowKeySharedOutOfOrder") + public Object[][] allowKeySharedOutOfOrder() { + return new Object[][]{ + {true}, + {false} + }; + } + /** * This test is in order to guarantee the feature added by https://github.com/apache/pulsar/pull/7105. * 1. Start 3 consumers: @@ -1755,8 +1764,8 @@ public void testNoRepeatedReadAndDiscard() throws Exception { * - no repeated Read-and-discard. * - at last, all messages will be received. */ - @Test(timeOut = 180 * 1000) // the test will be finished in 60s. - public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception { + @Test(timeOut = 180 * 1000, dataProvider = "allowKeySharedOutOfOrder") // the test will be finished in 60s. + public void testRecentJoinedPosWillNotStuckOtherConsumer(boolean allowKeySharedOutOfOrder) throws Exception { final int messagesSentPerTime = 100; final Set totalReceivedMessages = new TreeSet<>(); final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); @@ -1775,6 +1784,8 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception { log.info("Published message :{}", messageId); } + KeySharedPolicy keySharedPolicy = KeySharedPolicy.autoSplitHashRange() + .setAllowOutOfOrderDelivery(allowKeySharedOutOfOrder); // 1. Start 3 consumers and make ack holes. // - one consumer will be closed and trigger a messages redeliver. // - one consumer will not ack any messages to make the new consumer joined late will be stuck due to the @@ -1785,18 +1796,21 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception { .subscriptionName(subName) .receiverQueueSize(10) .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(keySharedPolicy) .subscribe(); Consumer consumer2 = pulsarClient.newConsumer(Schema.INT32) .topic(topic) .subscriptionName(subName) .receiverQueueSize(10) .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(keySharedPolicy) .subscribe(); Consumer consumer3 = pulsarClient.newConsumer(Schema.INT32) .topic(topic) .subscriptionName(subName) .receiverQueueSize(10) .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(keySharedPolicy) .subscribe(); List msgList1 = new ArrayList<>(); List msgList2 = new ArrayList<>(); @@ -1845,6 +1859,7 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception { .subscriptionName(subName) .receiverQueueSize(1000) .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(keySharedPolicy) .subscribe(); consumerWillBeClose.close(); From 47bcc33b65b37eecc76c37d116798cec2b618892 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Apr 2024 21:22:32 +0800 Subject: [PATCH 2/3] add comments --- ...ersistentStickyKeyDispatcherMultipleConsumers.java | 11 +++++++++-- .../pulsar/client/api/KeySharedSubscriptionTest.java | 1 - 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index b0203589b3b4b..21908a20ffb57 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -457,6 +457,11 @@ private int getAvailablePermits(Consumer c) { @Override protected synchronized NavigableSet filterOutEntriesWillBeDiscarded(NavigableSet src) { + // The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()", + // So skip this filter out. + if (!isAllowOutOfOrderDelivery()) { + return src; + } if (src.isEmpty()) { return src; } @@ -501,8 +506,10 @@ protected synchronized NavigableSet filterOutEntriesWillBeDiscarde */ @Override protected boolean hasConsumersNeededNormalRead() { - if (MapUtils.isEmpty(recentlyJoinedConsumers)) { - return getFirstAvailableConsumerPermits() > 0; + // The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()", + // So the method "filterOutEntriesWillBeDiscarded" will filter out nothing, just return "true" here. + if (!isAllowOutOfOrderDelivery()) { + return true; } for (Consumer consumer : consumerList) { if (consumer == null || consumer.isBlocked()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 3ca90a75b310d..27aa98597ec12 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -61,7 +61,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; From 754e76211db1b3cde9273b565b03d82cf4007f03 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Apr 2024 21:23:14 +0800 Subject: [PATCH 3/3] add comments --- .../PersistentStickyKeyDispatcherMultipleConsumers.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 21908a20ffb57..2df9f38531f5d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -459,7 +459,7 @@ private int getAvailablePermits(Consumer c) { protected synchronized NavigableSet filterOutEntriesWillBeDiscarded(NavigableSet src) { // The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()", // So skip this filter out. - if (!isAllowOutOfOrderDelivery()) { + if (isAllowOutOfOrderDelivery()) { return src; } if (src.isEmpty()) { @@ -508,7 +508,7 @@ protected synchronized NavigableSet filterOutEntriesWillBeDiscarde protected boolean hasConsumersNeededNormalRead() { // The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()", // So the method "filterOutEntriesWillBeDiscarded" will filter out nothing, just return "true" here. - if (!isAllowOutOfOrderDelivery()) { + if (isAllowOutOfOrderDelivery()) { return true; } for (Consumer consumer : consumerList) {