Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[fix][broker] Fix NPE causing dispatching to stop when using Key_Shared mode and allowOutOfOrderDelivery=true #22533

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ private int getAvailablePermits(Consumer c) {

@Override
protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
// The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
// So skip this filter out.
if (isAllowOutOfOrderDelivery()) {
return src;
}
if (src.isEmpty()) {
return src;
}
Expand Down Expand Up @@ -501,6 +506,11 @@ protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarde
*/
@Override
protected boolean hasConsumersNeededNormalRead() {
// The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
// So the method "filterOutEntriesWillBeDiscarded" will filter out nothing, just return "true" here.
if (isAllowOutOfOrderDelivery()) {
return true;
}
for (Consumer consumer : consumerList) {
if (consumer == null || consumer.isBlocked()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,14 @@ public void testNoRepeatedReadAndDiscard() throws Exception {
admin.topics().delete(topic, false);
}

@DataProvider(name = "allowKeySharedOutOfOrder")
public Object[][] allowKeySharedOutOfOrder() {
return new Object[][]{
{true},
{false}
};
}

/**
* This test is in order to guarantee the feature added by https://github.com/apache/pulsar/pull/7105.
* 1. Start 3 consumers:
Expand All @@ -1755,8 +1763,8 @@ public void testNoRepeatedReadAndDiscard() throws Exception {
* - no repeated Read-and-discard.
* - at last, all messages will be received.
*/
@Test(timeOut = 180 * 1000) // the test will be finished in 60s.
public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
@Test(timeOut = 180 * 1000, dataProvider = "allowKeySharedOutOfOrder") // the test will be finished in 60s.
public void testRecentJoinedPosWillNotStuckOtherConsumer(boolean allowKeySharedOutOfOrder) throws Exception {
final int messagesSentPerTime = 100;
final Set<Integer> totalReceivedMessages = new TreeSet<>();
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Expand All @@ -1775,6 +1783,8 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
log.info("Published message :{}", messageId);
}

KeySharedPolicy keySharedPolicy = KeySharedPolicy.autoSplitHashRange()
.setAllowOutOfOrderDelivery(allowKeySharedOutOfOrder);
// 1. Start 3 consumers and make ack holes.
// - one consumer will be closed and trigger a messages redeliver.
// - one consumer will not ack any messages to make the new consumer joined late will be stuck due to the
Expand All @@ -1785,18 +1795,21 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(keySharedPolicy)
.subscribe();
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(keySharedPolicy)
.subscribe();
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(keySharedPolicy)
.subscribe();
List<Message> msgList1 = new ArrayList<>();
List<Message> msgList2 = new ArrayList<>();
Expand Down Expand Up @@ -1845,6 +1858,7 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
.subscriptionName(subName)
.receiverQueueSize(1000)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(keySharedPolicy)
.subscribe();
consumerWillBeClose.close();

Expand Down
Loading