Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) #22407

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.manager;

import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
Expand Down Expand Up @@ -88,14 +91,27 @@ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,

@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
ServiceUnitState state = ServiceUnitStateData.state(data);

if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) {
if (log.isDebugEnabled()) {
log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit);
}
return;
}

if (t != null) {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t);
}
this.complete(serviceUnit, t);
return;
}
ServiceUnitState state = ServiceUnitStateData.state(data);

if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {}", data, serviceUnit);
}

switch (state) {
case Free, Owned -> this.complete(serviceUnit, t);
default -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2183,9 +2183,18 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(
disconnectClients, closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
: CompletableFuture.completedFuture(null))
.exceptionally(e -> {
if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException
&& e.getMessage().contains("Please redo the lookup")) {
log.warn("[{}] Topic ownership check failed. Skipping it", topicName);
return null;
}
throw FutureUtil.wrapToCompletionException(e);
}));
}
});

if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
&& serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
TransactionMetadataStoreService metadataStoreService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,11 +610,16 @@ protected CompletableFuture<Boolean> isBundleOwnedByAnyBroker(NamespaceName fqnn
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
NamespaceService nsService = pulsar().getNamespaceService();

if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return nsService.checkOwnershipPresentAsync(nsBundle);
}

LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(isRequestHttps())
.readOnly(true)
.loadTopicsInBundle(false).build();

return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -66,13 +65,10 @@ protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {

protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
conf.setForceDeleteNamespaceAllowed(true);
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerSheddingEnabled(false);
conf.setLoadBalancerDebugModeEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -263,6 +264,32 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
assertTrue(brokerLookupData.isPresent());
}

@Test(timeOut = 30 * 1000)
public void testUnloadUponTopicLookupFailure() throws Exception {
TopicName topicName =
TopicName.get("public/test/testUnloadUponTopicLookupFailure");
NamespaceBundle bundle = pulsar1.getNamespaceService().getBundle(topicName);
primaryLoadManager.assign(Optional.empty(), bundle).get();

CompletableFuture future1 = new CompletableFuture();
CompletableFuture future2 = new CompletableFuture();
try {
pulsar1.getBrokerService().getTopics().put(topicName.toString(), future1);
pulsar2.getBrokerService().getTopics().put(topicName.toString(), future2);
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS).execute(() -> {
future1.completeExceptionally(new CompletionException(
new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
future2.completeExceptionally(new CompletionException(
new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
});
admin.namespaces().unloadNamespaceBundle(bundle.getNamespaceObject().toString(), bundle.getBundleRange());
} finally {
pulsar1.getBrokerService().getTopics().remove(topicName.toString());
pulsar2.getBrokerService().getTopics().remove(topicName.toString());
}
}


@Test(timeOut = 30 * 1000)
public void testUnloadAdminAPI() throws Exception {
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-unload");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,53 +94,59 @@ public void testTimeout() throws IllegalAccessException {
public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException {
UnloadCounter counter = new UnloadCounter();
UnloadManager manager = new UnloadManager(counter);
String dstBroker = "broker-2";
String srcBroker = "broker-1";
String bundle = "bundle-1";
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin);
new UnloadDecision(new Unload(srcBroker, bundle), Success, Admin);
CompletableFuture<Void> future =
manager.waitAsync(CompletableFuture.completedFuture(null),
"bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
bundle, unloadDecision, 5, TimeUnit.SECONDS);
Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);

assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Assigning, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Assigning, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Deleted, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Deleted, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Splitting, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Releasing, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Init, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Free, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);

// Success with Owned state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
"bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();

future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);
}

Expand All @@ -158,7 +164,7 @@ public void testFailedStage() throws IllegalAccessException {
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT),
new ServiceUnitStateData(ServiceUnitState.Owned, null, "broker-1", VERSION_ID_INIT),
new IllegalStateException("Failed stage."));

try {
Expand Down
Loading