diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index a21aff3b365a2..36e88d91b3a83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -51,7 +51,6 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.NamespaceOperation; @@ -449,35 +448,35 @@ public void getListFromBundle( bundleRange); asyncResponse.resume(Response.noContent().build()); } else { - NamespaceBundle nsBundle; - try { - nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, - bundleRange, true, true); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - return; - } - try { - ConcurrentOpenHashMap> bundleTopics = - pulsar().getBrokerService().getMultiLayerTopicsMap().get(namespaceName.toString()); - if (bundleTopics == null || bundleTopics.isEmpty()) { - asyncResponse.resume(Collections.emptyList()); - return; - } - final List topicList = new ArrayList<>(); - String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange(); - ConcurrentOpenHashMap topicMap = bundleTopics.get(bundleKey); - if (topicMap != null) { - topicList.addAll(topicMap.keys().stream() - .filter(name -> !TopicName.get(name).isPersistent()) - .collect(Collectors.toList())); - } - asyncResponse.resume(topicList); - } catch (Exception e) { - log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), - namespaceName, bundleRange, e); - asyncResponse.resume(new RestException(e)); - } + validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true, true) + .thenAccept(nsBundle -> { + ConcurrentOpenHashMap> bundleTopics = + pulsar().getBrokerService() + .getMultiLayerTopicsMap().get(namespaceName.toString()); + if (bundleTopics == null || bundleTopics.isEmpty()) { + asyncResponse.resume(Collections.emptyList()); + return; + } + final List topicList = new ArrayList<>(); + String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange(); + ConcurrentOpenHashMap topicMap = bundleTopics.get(bundleKey); + if (topicMap != null) { + topicList.addAll(topicMap.keys().stream() + .filter(name -> !TopicName.get(name).isPersistent()) + .collect(Collectors.toList())); + } + asyncResponse.resume(topicList); + }).exceptionally(ex -> { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), + namespaceName, bundleRange, realCause); + if (realCause instanceof WebApplicationException) { + asyncResponse.resume(realCause); + } else { + asyncResponse.resume(new RestException(realCause)); + } + return null; + }); } }).exceptionally(ex -> { log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 3d23d7812543a..bfb94aa7740a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -717,19 +717,19 @@ public CompletableFuture validateBundleOwnershipAsync(NamespaceBundle bund throw new RestException(Status.PRECONDITION_FAILED, "Failed to find ownership for ServiceUnit:" + bundle.toString()); } - // If the load manager is extensible load manager, we don't need check the authoritative. - if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) { - return CompletableFuture.completedFuture(null); - } return nsService.isServiceUnitOwnedAsync(bundle) .thenAccept(owned -> { if (!owned) { boolean newAuthoritative = this.isLeaderBroker(); // Replace the host and port of the current request and redirect - URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost()) - .port(webUrl.get().getPort()).replaceQueryParam("authoritative", - newAuthoritative).replaceQueryParam("destinationBroker", - null).build(); + UriBuilder uriBuilder = UriBuilder.fromUri(uri.getRequestUri()) + .host(webUrl.get().getHost()) + .port(webUrl.get().getPort()) + .replaceQueryParam("authoritative", newAuthoritative); + if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) { + uriBuilder.replaceQueryParam("destinationBroker", null); + } + URI redirect = uriBuilder.build(); log.debug("{} is not a service unit owned", bundle); // Redirect log.debug("Redirecting the rest call to {}", redirect); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 1f4e8709d9d99..d348347515f03 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -964,7 +964,10 @@ public void testDisableBroker() throws Exception { defaultConf.setAllowAutoTopicCreation(true); defaultConf.setForceDeleteNamespaceAllowed(true); defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); defaultConf.setLoadBalancerSheddingEnabled(false); + defaultConf.setLoadBalancerDebugModeEnabled(true); + defaultConf.setTopicLevelPoliciesEnabled(false); try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) { var pulsar3 = additionalPulsarTestContext.getPulsarService(); ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl) @@ -1005,7 +1008,7 @@ public void testDisableBroker() throws Exception { @Test(timeOut = 30 * 1000) public void testListTopic() throws Exception { final String namespace = "public/testListTopic"; - admin.namespaces().createNamespace(namespace, 3); + admin.namespaces().createNamespace(namespace, 9); final String persistentTopicName = TopicName.get( "persistent", NamespaceName.get(namespace), @@ -1014,8 +1017,8 @@ public void testListTopic() throws Exception { final String nonPersistentTopicName = TopicName.get( "non-persistent", NamespaceName.get(namespace), "get_topics_mode_" + UUID.randomUUID()).toString(); - admin.topics().createPartitionedTopic(persistentTopicName, 3); - admin.topics().createPartitionedTopic(nonPersistentTopicName, 3); + admin.topics().createPartitionedTopic(persistentTopicName, 9); + admin.topics().createPartitionedTopic(nonPersistentTopicName, 9); pulsarClient.newProducer().topic(persistentTopicName).create().close(); pulsarClient.newProducer().topic(nonPersistentTopicName).create().close(); @@ -1033,10 +1036,10 @@ public void testListTopic() throws Exception { assertFalse(TopicName.get(s).isPersistent()); } } - assertEquals(topicNum, 3); + assertEquals(topicNum, 9); List list = admin.topics().getList(namespace); - assertEquals(list.size(), 6); + assertEquals(list.size(), 18); admin.namespaces().deleteNamespace(namespace, true); }