Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[improve][broker] Make get list from bundle Admin API async #20652

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
Expand Down Expand Up @@ -449,35 +448,35 @@ public void getListFromBundle(
bundleRange);
asyncResponse.resume(Response.noContent().build());
} else {
NamespaceBundle nsBundle;
try {
nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles,
bundleRange, true, true);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
return;
}
try {
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics =
pulsar().getBrokerService().getMultiLayerTopicsMap().get(namespaceName.toString());
if (bundleTopics == null || bundleTopics.isEmpty()) {
asyncResponse.resume(Collections.emptyList());
return;
}
final List<String> topicList = new ArrayList<>();
String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange();
ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey);
if (topicMap != null) {
topicList.addAll(topicMap.keys().stream()
.filter(name -> !TopicName.get(name).isPersistent())
.collect(Collectors.toList()));
}
asyncResponse.resume(topicList);
} catch (Exception e) {
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange, e);
asyncResponse.resume(new RestException(e));
}
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true, true)
.thenAccept(nsBundle -> {
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics =
pulsar().getBrokerService()
.getMultiLayerTopicsMap().get(namespaceName.toString());
if (bundleTopics == null || bundleTopics.isEmpty()) {
asyncResponse.resume(Collections.emptyList());
return;
}
final List<String> topicList = new ArrayList<>();
String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange();
ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey);
if (topicMap != null) {
topicList.addAll(topicMap.keys().stream()
.filter(name -> !TopicName.get(name).isPersistent())
.collect(Collectors.toList()));
}
asyncResponse.resume(topicList);
}).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange, realCause);
if (realCause instanceof WebApplicationException) {
asyncResponse.resume(realCause);
} else {
asyncResponse.resume(new RestException(realCause));
}
return null;
});
}
}).exceptionally(ex -> {
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,19 +717,19 @@ public CompletableFuture<Void> validateBundleOwnershipAsync(NamespaceBundle bund
throw new RestException(Status.PRECONDITION_FAILED,
"Failed to find ownership for ServiceUnit:" + bundle.toString());
}
// If the load manager is extensible load manager, we don't need check the authoritative.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
return CompletableFuture.completedFuture(null);
}
return nsService.isServiceUnitOwnedAsync(bundle)
.thenAccept(owned -> {
if (!owned) {
boolean newAuthoritative = this.isLeaderBroker();
// Replace the host and port of the current request and redirect
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
.port(webUrl.get().getPort()).replaceQueryParam("authoritative",
newAuthoritative).replaceQueryParam("destinationBroker",
null).build();
UriBuilder uriBuilder = UriBuilder.fromUri(uri.getRequestUri())
.host(webUrl.get().getHost())
.port(webUrl.get().getPort())
.replaceQueryParam("authoritative", newAuthoritative);
if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
Copy link
Contributor

@gaoran10 gaoran10 Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add this condition? And I'm not sure why need to reset the query param "destinationBroker".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for the extensible load manager, the transfer operation needs to use destinationBroker as the destination broker.

Copy link
Contributor

@gaoran10 gaoran10 Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks. Before, the param destinationBroker was only used by the bundle unload command, so it's a little confusing.

The bundle unload command with the param destinationBroker can't work with the extensible load manager, right?

Copy link
Member Author

@Demogorgon314 Demogorgon314 Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, the destinationBroker is set in the leader broker's ModularLoadManagerImpl#bundleBrokerAffinityMap, so it requires redirecting to the leader and setting the destinationBroker before doing in the real unload operation.

The unload operation requires execution in the owner broker, which will redirect to the owner broker after setting the destinationBroker success. If we don't clean the destinationBroker parameter, it will fall into a loop.

The ExtensibleLoadManagerImpl used a different way to transfer the ownership to destinationBroker, so we do not need to clean the destinationBroker parameter when redirecting. Since the destinationBroker do not set in the leader broker.

For the detail, you can check this method: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L954

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ppl will appreciate it if you can add these details as a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks.

uriBuilder.replaceQueryParam("destinationBroker", null);
}
URI redirect = uriBuilder.build();
log.debug("{} is not a service unit owned", bundle);
// Redirect
log.debug("Redirecting the rest call to {}", redirect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,10 @@ public void testDisableBroker() throws Exception {
defaultConf.setAllowAutoTopicCreation(true);
defaultConf.setForceDeleteNamespaceAllowed(true);
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
defaultConf.setLoadBalancerSheddingEnabled(false);
defaultConf.setLoadBalancerDebugModeEnabled(true);
defaultConf.setTopicLevelPoliciesEnabled(false);
try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
var pulsar3 = additionalPulsarTestContext.getPulsarService();
ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl)
Expand Down Expand Up @@ -1005,7 +1008,7 @@ public void testDisableBroker() throws Exception {
@Test(timeOut = 30 * 1000)
public void testListTopic() throws Exception {
final String namespace = "public/testListTopic";
admin.namespaces().createNamespace(namespace, 3);
admin.namespaces().createNamespace(namespace, 9);

final String persistentTopicName = TopicName.get(
"persistent", NamespaceName.get(namespace),
Expand All @@ -1014,8 +1017,8 @@ public void testListTopic() throws Exception {
final String nonPersistentTopicName = TopicName.get(
"non-persistent", NamespaceName.get(namespace),
"get_topics_mode_" + UUID.randomUUID()).toString();
admin.topics().createPartitionedTopic(persistentTopicName, 3);
admin.topics().createPartitionedTopic(nonPersistentTopicName, 3);
admin.topics().createPartitionedTopic(persistentTopicName, 9);
admin.topics().createPartitionedTopic(nonPersistentTopicName, 9);
pulsarClient.newProducer().topic(persistentTopicName).create().close();
pulsarClient.newProducer().topic(nonPersistentTopicName).create().close();

Expand All @@ -1033,10 +1036,10 @@ public void testListTopic() throws Exception {
assertFalse(TopicName.get(s).isPersistent());
}
}
assertEquals(topicNum, 3);
assertEquals(topicNum, 9);

List<String> list = admin.topics().getList(namespace);
assertEquals(list.size(), 6);
assertEquals(list.size(), 18);
admin.namespaces().deleteNamespace(namespace, true);
}

Expand Down