From 3053b647e0ca646b2df9f03815947104cd2e705f Mon Sep 17 00:00:00 2001 From: Kai Wang Date: Sat, 17 Aug 2024 00:17:55 +0800 Subject: [PATCH] [improve][broker] Should notify bundle ownership listener onLoad event when ServiceUnitState start (ExtensibleLoadManagerImpl only) (#23152) --- .../channel/ServiceUnitStateChannelImpl.java | 16 +++++- .../ExtensibleLoadManagerImplTest.java | 50 +++++++++++++++++++ 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index dbe3b88b61f28..1063f8124ece8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -324,7 +324,8 @@ public synchronized void start() throws PulsarServerException { "topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())) .create(); - tableview.listen((key, value) -> handle(key, value)); + tableview.listen(this::handleEvent); + tableview.forEach(this::handleExisting); var strategy = (ServiceUnitStateCompactionStrategy) TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG); if (strategy == null) { String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is null."; @@ -690,7 +691,7 @@ public CompletableFuture publishSplitEventAsync(Split split) { }).thenApply(__ -> null); } - private void handle(String serviceUnit, ServiceUnitStateData data) { + private void handleEvent(String serviceUnit, ServiceUnitStateData data) { long totalHandledRequests = getHandlerTotalCounter(data).incrementAndGet(); if (debug()) { log.info("{} received a handle request for serviceUnit:{}, data:{}. totalHandledRequests:{}", @@ -716,6 +717,17 @@ private void handle(String serviceUnit, ServiceUnitStateData data) { } } + private void handleExisting(String serviceUnit, ServiceUnitStateData data) { + if (debug()) { + log.info("Loaded the service unit state data. serviceUnit: {}, data: {}", serviceUnit, data); + } + ServiceUnitState state = state(data); + if (state.equals(Owned) && isTargetBroker(data.dstBroker())) { + pulsar.getNamespaceService() + .onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit)); + } + } + private static boolean isTransferCommand(ServiceUnitStateData data) { if (data == null) { return false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 69a65caf2943c..51966f420bf25 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -417,6 +417,56 @@ public boolean test(NamespaceBundle namespaceBundle) { } } + @Test(timeOut = 30 * 1000) + public void testNamespaceOwnershipListener() throws Exception { + Pair topicAndBundle = + getBundleIsNotOwnByChangeEventTopic("test-namespace-ownership-listener"); + TopicName topicName = topicAndBundle.getLeft(); + NamespaceBundle bundle = topicAndBundle.getRight(); + + String broker = admin.lookups().lookupTopic(topicName.toString()); + log.info("Assign the bundle {} to {}", bundle, broker); + + checkOwnershipState(broker, bundle); + + AtomicInteger onloadCount = new AtomicInteger(0); + AtomicInteger unloadCount = new AtomicInteger(0); + + NamespaceBundleOwnershipListener listener = new NamespaceBundleOwnershipListener() { + @Override + public void onLoad(NamespaceBundle bundle) { + onloadCount.incrementAndGet(); + } + + @Override + public void unLoad(NamespaceBundle bundle) { + unloadCount.incrementAndGet(); + } + + @Override + public boolean test(NamespaceBundle namespaceBundle) { + return namespaceBundle.equals(bundle); + } + }; + pulsar1.getNamespaceService().addNamespaceBundleOwnershipListener(listener); + pulsar2.getNamespaceService().addNamespaceBundleOwnershipListener(listener); + + // There are a service unit state channel already started, when add listener, it will trigger the onload event. + Awaitility.await().untilAsserted(() -> { + assertEquals(onloadCount.get(), 1); + assertEquals(unloadCount.get(), 0); + }); + + ServiceUnitStateChannelImpl channel = new ServiceUnitStateChannelImpl(pulsar1); + channel.start(); + Awaitility.await().untilAsserted(() -> { + assertEquals(onloadCount.get(), 2); + assertEquals(unloadCount.get(), 0); + }); + + channel.close(); + } + @DataProvider(name = "isPersistentTopicSubscriptionTypeTest") public Object[][] isPersistentTopicSubscriptionTypeTest() { return new Object[][]{