From c024bae234d1489d31d602846410bf8e89686ce0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 31 Jan 2024 01:13:28 +0800 Subject: [PATCH 1/9] [improve] [admin] improve topic stats if not exists --- .../pulsar/broker/service/BrokerService.java | 80 ++++++++++--------- .../pulsar/broker/admin/AdminApi2Test.java | 28 +++++++ 2 files changed, 71 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8654a8300502c..226d8046a3bd1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -969,43 +969,49 @@ public CompletableFuture> getTopic(final TopicName topicName, bo } final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent); if (isPersistentTopic) { - final CompletableFuture> topicPoliciesFuture = - getTopicPoliciesBypassSystemTopic(topicName); - return topicPoliciesFuture.exceptionally(ex -> { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - final String errorInfo = String.format("Topic creation encountered an exception by initialize" - + " topic policies service. topic_name=%s error_message=%s", topicName, rc.getMessage()); - log.error(errorInfo, rc); - throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); - }).thenCompose(optionalTopicPolicies -> { - final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); - return topics.computeIfAbsent(topicName.toString(), (tpName) -> { - if (topicName.isPartitioned()) { - final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); - return fetchPartitionedTopicMetadataAsync(topicNameEntity) - .thenCompose((metadata) -> { - // Allow crate non-partitioned persistent topic that name includes `partition` - if (metadata.partitions == 0 - || topicName.getPartitionIndex() < metadata.partitions) { - return loadOrCreatePersistentTopic(tpName, createIfMissing, - properties, topicPolicies); - } - final String errorMsg = - String.format("Illegal topic partition name %s with max allowed " - + "%d partitions", topicName, metadata.partitions); - log.warn(errorMsg); - return FutureUtil - .failedFuture(new BrokerServiceException.NotAllowedException(errorMsg)); - }); - } - return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); - }).thenCompose(optionalTopic -> { - if (!optionalTopic.isPresent() && createIfMissing) { - log.warn("[{}] Try to recreate the topic with createIfMissing=true " - + "but the returned topic is empty", topicName); - return getTopic(topicName, createIfMissing, properties); - } - return CompletableFuture.completedFuture(optionalTopic); + return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName) + .thenCompose(exists -> { + if (!exists && !createIfMissing) { + return CompletableFuture.completedFuture(Optional.empty()); + } + return getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + final String errorInfo = String.format("Topic creation encountered an exception by initialize" + + " topic policies service. topic_name=%s error_message=%s", topicName, + rc.getMessage()); + log.error(errorInfo, rc); + throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); + }).thenCompose(optionalTopicPolicies -> { + final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); + return topics.computeIfAbsent(topicName.toString(), (tpName) -> { + if (topicName.isPartitioned()) { + final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); + return fetchPartitionedTopicMetadataAsync(topicNameEntity) + .thenCompose((metadata) -> { + // Allow crate non-partitioned persistent topic that name includes + // `partition` + if (metadata.partitions == 0 + || topicName.getPartitionIndex() < metadata.partitions) { + return loadOrCreatePersistentTopic(tpName, createIfMissing, + properties, topicPolicies); + } + final String errorMsg = + String.format("Illegal topic partition name %s with max allowed " + + "%d partitions", topicName, metadata.partitions); + log.warn(errorMsg); + return FutureUtil.failedFuture( + new BrokerServiceException.NotAllowedException(errorMsg)); + }); + } + return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); + }).thenCompose(optionalTopic -> { + if (!optionalTopic.isPresent() && createIfMissing) { + log.warn("[{}] Try to recreate the topic with createIfMissing=true " + + "but the returned topic is empty", topicName); + return getTopic(topicName, createIfMissing, properties); + } + return CompletableFuture.completedFuture(optionalTopic); + }); }); }); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index f0bc80fa36495..285df382c7f0a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3389,4 +3389,32 @@ private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Except // cleanup. admin.namespaces().deleteNamespace(ns); } + + @Test + private void testGetStatsIfPartitionNotExists() throws Exception { + // create topic. + final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp"); + admin.topics().createPartitionedTopic(partitionedTp, 1); + TopicName partition0 = TopicName.get(partitionedTp).getPartition(0); + boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent(); + assertTrue(topicExists1); + // Verify topics-stats works. + TopicStats topicStats = admin.topics().getStats(partition0.toString()); + assertNotNull(topicStats); + + // Delete partition and call topic-stats again. + admin.topics().delete(partition0.toString()); + boolean topicExists2 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent(); + assertFalse(topicExists2); + // Verify: respond 404. + try { + admin.topics().getStats(partition0.toString()); + fail("Should respond 404 after the partition was deleted"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Topic partitions were not yet created")); + } + + // cleanup. + admin.topics().deletePartitionedTopic(partitionedTp); + } } From 202d367e188fab3cf8942b5590f8521ef7b387a1 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 31 Jan 2024 01:46:52 +0800 Subject: [PATCH 2/9] do not print error log if respond 404 --- .../pulsar/broker/admin/AdminResource.java | 7 ++++ .../admin/impl/PersistentTopicsBase.java | 4 +-- .../broker/admin/v2/PersistentTopics.java | 36 +++++++++---------- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 54bc0077103b0..27c3471b018b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -877,4 +877,11 @@ protected void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) { "The bucket must be specified for namespace offload."); } } + + public static boolean isRest404Exception(Throwable t) { + if (t instanceof WebApplicationException) { + return false; + } + return ((WebApplicationException) t).getResponse().getStatus() != Response.Status.NOT_FOUND.getStatusCode(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 06584344b9a35..b4b5a623ad834 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -5012,9 +5012,7 @@ protected CompletableFuture internalRemoveSubscribeRate(boolean isGlobal) protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) { Throwable cause = thr.getCause(); - if (!(cause instanceof WebApplicationException) || !( - ((WebApplicationException) cause).getResponse().getStatus() == 307 - || ((WebApplicationException) cause).getResponse().getStatus() == 404)) { + if (!isRedirectException(cause) && !isRest404Exception(cause)) { log.error("[{}] Failed to perform {} on topic {}", clientAppId(), methodName, topicName, cause); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 97531cf8ab017..aa1a8232150e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -119,7 +119,7 @@ public void getList( internalGetListAsync(Optional.ofNullable(bundle)) .thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic))) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -150,7 +150,7 @@ public void getPartitionedTopicList( .thenAccept(partitionedTopicList -> asyncResponse.resume( filterSystemTopic(partitionedTopicList, includeSystemTopic))) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -335,7 +335,7 @@ public void createNonPartitionedTopic( internalCreateNonPartitionedTopicAsync(authoritative, properties) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -825,7 +825,7 @@ public void updatePartitionedTopic( asyncResponse.resume(Response.noContent().build()); }) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}][{}] Failed to update partition to {}", clientAppId(), topicName, numPartitions, ex); } @@ -934,7 +934,7 @@ public void getProperties( internalGetPropertiesAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -970,7 +970,7 @@ public void updateProperties( internalUpdatePropertiesAsync(authoritative, properties) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1004,7 +1004,7 @@ public void removeProperties( internalRemovePropertiesAsync(authoritative, key) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to remove key {} in properties on topic {}", clientAppId(), key, topicName, ex); } @@ -1125,7 +1125,7 @@ public void deleteTopic( } else if (isManagedLedgerNotFoundException(t)) { ex = new RestException(Response.Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); - } else if (!isRedirectException(ex)) { + } else if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1209,7 +1209,7 @@ public void getStats( .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to get stats for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1243,7 +1243,7 @@ public void getInternalStats( internalGetInternalStatsAsync(authoritative, metadata) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1892,7 +1892,7 @@ public void peekNthMessage( internalPeekNthMessageAsync(decode(encodedSubName), messagePosition, authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to get peek nth message for topic {} subscription {}", clientAppId(), topicName, decode(encodedSubName), ex); } @@ -1934,7 +1934,7 @@ public void examineMessage( internalExamineMessageAsync(initialPosition, messagePosition, authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName, ex); } @@ -1976,7 +1976,7 @@ public void getMessageById( .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", clientAppId(), ledgerId, entryId, topicName, ex); } @@ -2020,7 +2020,7 @@ public void getMessageIdByTimestamp( } }) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to get message ID by timestamp {} from {}", clientAppId(), timestamp, topicName, ex); } @@ -2055,7 +2055,7 @@ public void getBacklog( log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName); ex = new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"); - } else if (!isRedirectException(ex)) { + } else if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to get estimated backlog for topic {}", clientAppId(), encodedTopic, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3173,7 +3173,7 @@ public void terminate( internalTerminateAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3269,7 +3269,7 @@ public void compactionStatus( internalCompactionStatusAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to get the status of a compaction operation for the topic {}", clientAppId(), topicName, ex); } @@ -3408,7 +3408,7 @@ public void trimTopic( validateTopicName(tenant, namespace, encodedTopic); internalTrimTopic(asyncResponse, authoritative).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isRedirectException(ex) || !isRest404Exception(ex)) { log.error("[{}] Failed to trim topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); From faedb0626fd6d6832979e1fd59853c5a08a45c6a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 31 Jan 2024 01:53:09 +0800 Subject: [PATCH 3/9] - --- .../broker/admin/v2/PersistentTopics.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index aa1a8232150e1..48871ca5b8edf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -119,7 +119,7 @@ public void getList( internalGetListAsync(Optional.ofNullable(bundle)) .thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic))) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -150,7 +150,7 @@ public void getPartitionedTopicList( .thenAccept(partitionedTopicList -> asyncResponse.resume( filterSystemTopic(partitionedTopicList, includeSystemTopic))) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -335,7 +335,7 @@ public void createNonPartitionedTopic( internalCreateNonPartitionedTopicAsync(authoritative, properties) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -825,7 +825,7 @@ public void updatePartitionedTopic( asyncResponse.resume(Response.noContent().build()); }) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}][{}] Failed to update partition to {}", clientAppId(), topicName, numPartitions, ex); } @@ -934,7 +934,7 @@ public void getProperties( internalGetPropertiesAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -970,7 +970,7 @@ public void updateProperties( internalUpdatePropertiesAsync(authoritative, properties) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1004,7 +1004,7 @@ public void removeProperties( internalRemovePropertiesAsync(authoritative, key) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to remove key {} in properties on topic {}", clientAppId(), key, topicName, ex); } @@ -1125,7 +1125,7 @@ public void deleteTopic( } else if (isManagedLedgerNotFoundException(t)) { ex = new RestException(Response.Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); - } else if (!isRedirectException(ex) || !isRest404Exception(ex)) { + } else if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1209,7 +1209,7 @@ public void getStats( .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to get stats for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1243,7 +1243,7 @@ public void getInternalStats( internalGetInternalStatsAsync(authoritative, metadata) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1892,7 +1892,7 @@ public void peekNthMessage( internalPeekNthMessageAsync(decode(encodedSubName), messagePosition, authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to get peek nth message for topic {} subscription {}", clientAppId(), topicName, decode(encodedSubName), ex); } @@ -1934,7 +1934,7 @@ public void examineMessage( internalExamineMessageAsync(initialPosition, messagePosition, authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName, ex); } @@ -1976,7 +1976,7 @@ public void getMessageById( .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", clientAppId(), ledgerId, entryId, topicName, ex); } @@ -2020,7 +2020,7 @@ public void getMessageIdByTimestamp( } }) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to get message ID by timestamp {} from {}", clientAppId(), timestamp, topicName, ex); } @@ -2055,7 +2055,7 @@ public void getBacklog( log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName); ex = new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"); - } else if (!isRedirectException(ex) || !isRest404Exception(ex)) { + } else if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to get estimated backlog for topic {}", clientAppId(), encodedTopic, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3173,7 +3173,7 @@ public void terminate( internalTerminateAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3269,7 +3269,7 @@ public void compactionStatus( internalCompactionStatusAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to get the status of a compaction operation for the topic {}", clientAppId(), topicName, ex); } @@ -3408,7 +3408,7 @@ public void trimTopic( validateTopicName(tenant, namespace, encodedTopic); internalTrimTopic(asyncResponse, authoritative).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex) || !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isRest404Exception(ex)) { log.error("[{}] Failed to trim topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); From 29b8bcb4c312a693121e94718d7bfcbbccd7e3da Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 31 Jan 2024 10:48:58 +0800 Subject: [PATCH 4/9] fix bug --- .../main/java/org/apache/pulsar/broker/admin/AdminResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 27c3471b018b0..4c38dab3287f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -882,6 +882,6 @@ public static boolean isRest404Exception(Throwable t) { if (t instanceof WebApplicationException) { return false; } - return ((WebApplicationException) t).getResponse().getStatus() != Response.Status.NOT_FOUND.getStatusCode(); + return ((WebApplicationException) t).getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode(); } } From aaa960b739dd0c854357cdb70524909cf873c8f8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 31 Jan 2024 10:51:19 +0800 Subject: [PATCH 5/9] fix bug --- .../pulsar/broker/admin/AdminResource.java | 7 ---- .../admin/impl/PersistentTopicsBase.java | 2 +- .../broker/admin/v2/PersistentTopics.java | 36 +++++++++---------- 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 4c38dab3287f5..54bc0077103b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -877,11 +877,4 @@ protected void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) { "The bucket must be specified for namespace offload."); } } - - public static boolean isRest404Exception(Throwable t) { - if (t instanceof WebApplicationException) { - return false; - } - return ((WebApplicationException) t).getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode(); - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index b4b5a623ad834..1922bdbc31d20 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -5012,7 +5012,7 @@ protected CompletableFuture internalRemoveSubscribeRate(boolean isGlobal) protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) { Throwable cause = thr.getCause(); - if (!isRedirectException(cause) && !isRest404Exception(cause)) { + if (!isRedirectException(cause) && !isNotFoundException(cause)) { log.error("[{}] Failed to perform {} on topic {}", clientAppId(), methodName, topicName, cause); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 48871ca5b8edf..09aacb87edc42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -119,7 +119,7 @@ public void getList( internalGetListAsync(Optional.ofNullable(bundle)) .thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic))) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -150,7 +150,7 @@ public void getPartitionedTopicList( .thenAccept(partitionedTopicList -> asyncResponse.resume( filterSystemTopic(partitionedTopicList, includeSystemTopic))) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -335,7 +335,7 @@ public void createNonPartitionedTopic( internalCreateNonPartitionedTopicAsync(authoritative, properties) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -825,7 +825,7 @@ public void updatePartitionedTopic( asyncResponse.resume(Response.noContent().build()); }) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}][{}] Failed to update partition to {}", clientAppId(), topicName, numPartitions, ex); } @@ -934,7 +934,7 @@ public void getProperties( internalGetPropertiesAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -970,7 +970,7 @@ public void updateProperties( internalUpdatePropertiesAsync(authoritative, properties) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1004,7 +1004,7 @@ public void removeProperties( internalRemovePropertiesAsync(authoritative, key) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to remove key {} in properties on topic {}", clientAppId(), key, topicName, ex); } @@ -1125,7 +1125,7 @@ public void deleteTopic( } else if (isManagedLedgerNotFoundException(t)) { ex = new RestException(Response.Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); - } else if (!isRedirectException(ex) && !isRest404Exception(ex)) { + } else if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1209,7 +1209,7 @@ public void getStats( .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to get stats for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1243,7 +1243,7 @@ public void getInternalStats( internalGetInternalStatsAsync(authoritative, metadata) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1892,7 +1892,7 @@ public void peekNthMessage( internalPeekNthMessageAsync(decode(encodedSubName), messagePosition, authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to get peek nth message for topic {} subscription {}", clientAppId(), topicName, decode(encodedSubName), ex); } @@ -1934,7 +1934,7 @@ public void examineMessage( internalExamineMessageAsync(initialPosition, messagePosition, authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName, ex); } @@ -1976,7 +1976,7 @@ public void getMessageById( .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", clientAppId(), ledgerId, entryId, topicName, ex); } @@ -2020,7 +2020,7 @@ public void getMessageIdByTimestamp( } }) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to get message ID by timestamp {} from {}", clientAppId(), timestamp, topicName, ex); } @@ -2055,7 +2055,7 @@ public void getBacklog( log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName); ex = new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"); - } else if (!isRedirectException(ex) && !isRest404Exception(ex)) { + } else if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to get estimated backlog for topic {}", clientAppId(), encodedTopic, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3173,7 +3173,7 @@ public void terminate( internalTerminateAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3269,7 +3269,7 @@ public void compactionStatus( internalCompactionStatusAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to get the status of a compaction operation for the topic {}", clientAppId(), topicName, ex); } @@ -3408,7 +3408,7 @@ public void trimTopic( validateTopicName(tenant, namespace, encodedTopic); internalTrimTopic(asyncResponse, authoritative).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex) && !isRest404Exception(ex)) { + if (!isRedirectException(ex) && !isNotFoundException(ex)) { log.error("[{}] Failed to trim topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); From cd50855de5b3659e92a8a3cf4d3dfe3e2f5d2b9b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 31 Jan 2024 15:23:12 +0800 Subject: [PATCH 6/9] fix test --- .../pulsar/broker/service/persistent/PersistentTopicTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 4eb2aa15fa292..cda62e5567e6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -296,7 +296,8 @@ public void testPersistentPartitionedTopicUnload() throws Exception { assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName)); pulsar.getBrokerService().getTopicIfExists(topicName).get(); - assertTrue(pulsar.getBrokerService().getTopics().containsKey(topicName)); + // The map topics should only contain partitions, does not contain partitioned topic. + assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName)); // ref of partitioned-topic name should be empty assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); From e1e43be20441c688a8162c50ba8394139fa1c12c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 31 Jan 2024 16:03:50 +0800 Subject: [PATCH 7/9] fix test --- .../pulsar/broker/TopicEventsListenerTest.java | 14 +++++++++++--- .../pulsar/broker/admin/TopicAutoCreationTest.java | 14 ++++++++------ 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java index e6459bbf74c31..3bbda47c7c641 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java @@ -278,11 +278,19 @@ private void createTopicAndVerifyEvents(String topicTypePartitioned, String topi } else { topicNameToWatch = topicName; - admin.topics().createNonPartitionedTopic(topicName); + try { + admin.topics().createNonPartitionedTopic(topicName); + }catch (Exception ex) { + + } expectedEvents = new String[]{ - "LOAD__BEFORE", - "LOAD__FAILURE", + // Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic + // was already exists, and the action "check topic exists" will try to load Managed ledger, + // the check triggers two exrtra events: [LOAD__BEFORE, LOAD__FAILURE]. + // #21995 fixed this wrong behavior, so remove these two events. + // "LOAD__BEFORE", + // "LOAD__FAILURE", "LOAD__BEFORE", "CREATE__BEFORE", "CREATE__SUCCESS", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index 9cd1cf214f67e..bb4a23bf24bd9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -149,10 +149,11 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() .sendTimeout(1, TimeUnit.SECONDS) .topic(topic) .create()) { - } catch (PulsarClientException.LookupException expected) { - String msg = "Namespace bundle for topic (%s) not served by this instance"; + } catch (PulsarClientException.TopicDoesNotExistException expected) { + // Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false, + // so the "TopicDoesNotExistException" is expected. log.info("Expected error", expected); - assertTrue(expected.getMessage().contains(String.format(msg, topic)) + assertTrue(expected.getMessage().contains(topic) || expected.getMessage().contains(topicPoliciesServiceInitException)); } @@ -160,10 +161,11 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() .topic(topic) .subscriptionName("test") .subscribe()) { - } catch (PulsarClientException.LookupException expected) { - String msg = "Namespace bundle for topic (%s) not served by this instance"; + } catch (PulsarClientException.TopicDoesNotExistException expected) { + // Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false, + // so the "TopicDoesNotExistException" is expected. log.info("Expected error", expected); - assertTrue(expected.getMessage().contains(String.format(msg, topic)) + assertTrue(expected.getMessage().contains(topic) || expected.getMessage().contains(topicPoliciesServiceInitException)); } From 1fe7ba8e6890f9dfa070fc26cf72b574d1a3eecd Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 31 Jan 2024 16:06:45 +0800 Subject: [PATCH 8/9] split to 2 PRs --- .../pulsar/broker/service/BrokerService.java | 80 +++++++++---------- .../broker/TopicEventsListenerTest.java | 14 +--- .../pulsar/broker/admin/AdminApi2Test.java | 28 ------- .../broker/admin/TopicAutoCreationTest.java | 14 ++-- .../persistent/PersistentTopicTest.java | 3 +- 5 files changed, 47 insertions(+), 92 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 226d8046a3bd1..8654a8300502c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -969,49 +969,43 @@ public CompletableFuture> getTopic(final TopicName topicName, bo } final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent); if (isPersistentTopic) { - return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName) - .thenCompose(exists -> { - if (!exists && !createIfMissing) { - return CompletableFuture.completedFuture(Optional.empty()); - } - return getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - final String errorInfo = String.format("Topic creation encountered an exception by initialize" - + " topic policies service. topic_name=%s error_message=%s", topicName, - rc.getMessage()); - log.error(errorInfo, rc); - throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); - }).thenCompose(optionalTopicPolicies -> { - final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); - return topics.computeIfAbsent(topicName.toString(), (tpName) -> { - if (topicName.isPartitioned()) { - final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); - return fetchPartitionedTopicMetadataAsync(topicNameEntity) - .thenCompose((metadata) -> { - // Allow crate non-partitioned persistent topic that name includes - // `partition` - if (metadata.partitions == 0 - || topicName.getPartitionIndex() < metadata.partitions) { - return loadOrCreatePersistentTopic(tpName, createIfMissing, - properties, topicPolicies); - } - final String errorMsg = - String.format("Illegal topic partition name %s with max allowed " - + "%d partitions", topicName, metadata.partitions); - log.warn(errorMsg); - return FutureUtil.failedFuture( - new BrokerServiceException.NotAllowedException(errorMsg)); - }); - } - return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); - }).thenCompose(optionalTopic -> { - if (!optionalTopic.isPresent() && createIfMissing) { - log.warn("[{}] Try to recreate the topic with createIfMissing=true " - + "but the returned topic is empty", topicName); - return getTopic(topicName, createIfMissing, properties); - } - return CompletableFuture.completedFuture(optionalTopic); - }); + final CompletableFuture> topicPoliciesFuture = + getTopicPoliciesBypassSystemTopic(topicName); + return topicPoliciesFuture.exceptionally(ex -> { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + final String errorInfo = String.format("Topic creation encountered an exception by initialize" + + " topic policies service. topic_name=%s error_message=%s", topicName, rc.getMessage()); + log.error(errorInfo, rc); + throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); + }).thenCompose(optionalTopicPolicies -> { + final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); + return topics.computeIfAbsent(topicName.toString(), (tpName) -> { + if (topicName.isPartitioned()) { + final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); + return fetchPartitionedTopicMetadataAsync(topicNameEntity) + .thenCompose((metadata) -> { + // Allow crate non-partitioned persistent topic that name includes `partition` + if (metadata.partitions == 0 + || topicName.getPartitionIndex() < metadata.partitions) { + return loadOrCreatePersistentTopic(tpName, createIfMissing, + properties, topicPolicies); + } + final String errorMsg = + String.format("Illegal topic partition name %s with max allowed " + + "%d partitions", topicName, metadata.partitions); + log.warn(errorMsg); + return FutureUtil + .failedFuture(new BrokerServiceException.NotAllowedException(errorMsg)); + }); + } + return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); + }).thenCompose(optionalTopic -> { + if (!optionalTopic.isPresent() && createIfMissing) { + log.warn("[{}] Try to recreate the topic with createIfMissing=true " + + "but the returned topic is empty", topicName); + return getTopic(topicName, createIfMissing, properties); + } + return CompletableFuture.completedFuture(optionalTopic); }); }); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java index 3bbda47c7c641..e6459bbf74c31 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java @@ -278,19 +278,11 @@ private void createTopicAndVerifyEvents(String topicTypePartitioned, String topi } else { topicNameToWatch = topicName; - try { - admin.topics().createNonPartitionedTopic(topicName); - }catch (Exception ex) { - - } + admin.topics().createNonPartitionedTopic(topicName); expectedEvents = new String[]{ - // Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic - // was already exists, and the action "check topic exists" will try to load Managed ledger, - // the check triggers two exrtra events: [LOAD__BEFORE, LOAD__FAILURE]. - // #21995 fixed this wrong behavior, so remove these two events. - // "LOAD__BEFORE", - // "LOAD__FAILURE", + "LOAD__BEFORE", + "LOAD__FAILURE", "LOAD__BEFORE", "CREATE__BEFORE", "CREATE__SUCCESS", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 285df382c7f0a..f0bc80fa36495 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3389,32 +3389,4 @@ private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Except // cleanup. admin.namespaces().deleteNamespace(ns); } - - @Test - private void testGetStatsIfPartitionNotExists() throws Exception { - // create topic. - final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp"); - admin.topics().createPartitionedTopic(partitionedTp, 1); - TopicName partition0 = TopicName.get(partitionedTp).getPartition(0); - boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent(); - assertTrue(topicExists1); - // Verify topics-stats works. - TopicStats topicStats = admin.topics().getStats(partition0.toString()); - assertNotNull(topicStats); - - // Delete partition and call topic-stats again. - admin.topics().delete(partition0.toString()); - boolean topicExists2 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent(); - assertFalse(topicExists2); - // Verify: respond 404. - try { - admin.topics().getStats(partition0.toString()); - fail("Should respond 404 after the partition was deleted"); - } catch (Exception ex) { - assertTrue(ex.getMessage().contains("Topic partitions were not yet created")); - } - - // cleanup. - admin.topics().deletePartitionedTopic(partitionedTp); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index bb4a23bf24bd9..9cd1cf214f67e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -149,11 +149,10 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() .sendTimeout(1, TimeUnit.SECONDS) .topic(topic) .create()) { - } catch (PulsarClientException.TopicDoesNotExistException expected) { - // Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false, - // so the "TopicDoesNotExistException" is expected. + } catch (PulsarClientException.LookupException expected) { + String msg = "Namespace bundle for topic (%s) not served by this instance"; log.info("Expected error", expected); - assertTrue(expected.getMessage().contains(topic) + assertTrue(expected.getMessage().contains(String.format(msg, topic)) || expected.getMessage().contains(topicPoliciesServiceInitException)); } @@ -161,11 +160,10 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() .topic(topic) .subscriptionName("test") .subscribe()) { - } catch (PulsarClientException.TopicDoesNotExistException expected) { - // Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false, - // so the "TopicDoesNotExistException" is expected. + } catch (PulsarClientException.LookupException expected) { + String msg = "Namespace bundle for topic (%s) not served by this instance"; log.info("Expected error", expected); - assertTrue(expected.getMessage().contains(topic) + assertTrue(expected.getMessage().contains(String.format(msg, topic)) || expected.getMessage().contains(topicPoliciesServiceInitException)); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index cda62e5567e6a..4eb2aa15fa292 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -296,8 +296,7 @@ public void testPersistentPartitionedTopicUnload() throws Exception { assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName)); pulsar.getBrokerService().getTopicIfExists(topicName).get(); - // The map topics should only contain partitions, does not contain partitioned topic. - assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName)); + assertTrue(pulsar.getBrokerService().getTopics().containsKey(topicName)); // ref of partitioned-topic name should be empty assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); From 47d70d85a5c56f4bb94be3b8d96882ac38e4e3b1 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 2 Feb 2024 02:14:54 +0800 Subject: [PATCH 9/9] add method isNot307And404Exception --- .../pulsar/broker/admin/AdminResource.java | 4 + .../admin/impl/PersistentTopicsBase.java | 86 +++++++++---------- .../admin/impl/SchemasResourceBase.java | 2 +- .../broker/admin/v2/NonPersistentTopics.java | 6 +- .../broker/admin/v2/PersistentTopics.java | 36 ++++---- .../pulsar/broker/admin/v3/Transactions.java | 12 +-- 6 files changed, 75 insertions(+), 71 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 54bc0077103b0..8eba6cc7b050b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -834,6 +834,10 @@ protected static boolean isNotFoundException(Throwable ex) { == Status.NOT_FOUND.getStatusCode(); } + protected static boolean isNot307And404Exception(Throwable ex) { + return !isRedirectException(ex) && !isNotFoundException(ex); + } + protected static String getTopicNotFoundErrorMessage(String topic) { return String.format("Topic %s not found", topic); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 1922bdbc31d20..404f36773d565 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -874,7 +874,7 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned metadata while unloading topic {}", clientAppId(), topicName, ex); } @@ -884,7 +884,7 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate the global namespace ownership while unloading topic {}", clientAppId(), topicName, ex); } @@ -1052,7 +1052,7 @@ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, })) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1074,7 +1074,7 @@ private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncRespon })) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to unload tc {},{}", clientAppId(), topicName.getPartitionIndex(), ex); } @@ -1176,7 +1176,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned topic metadata while get" + " subscriptions for topic {}", clientAppId(), topicName, ex); } @@ -1186,7 +1186,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate the global namespace/topic ownership while get subscriptions" + " for topic {}", clientAppId(), topicName, ex); } @@ -1195,7 +1195,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut }) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get subscriptions for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1234,7 +1234,7 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR .thenAccept(topic -> asyncResponse.resume(new ArrayList<>(topic.getSubscriptions().keys()))) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1343,7 +1343,7 @@ protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned metadata while get managed info for {}", clientAppId(), topicName, ex); } @@ -1353,7 +1353,7 @@ protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate the global namespace ownership while get managed info for {}", clientAppId(), topicName, ex); } @@ -1472,7 +1472,7 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1527,7 +1527,7 @@ protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1655,7 +1655,7 @@ private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncRespo }).exceptionally(ex -> { Throwable cause = ex.getCause(); // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to analyze subscription backlog {} {}", clientAppId(), topicName, subName, cause); } @@ -1682,7 +1682,7 @@ private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncRes }).exceptionally(ex -> { Throwable cause = ex.getCause(); // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause); } asyncResponse.resume(new RestException(cause)); @@ -1711,7 +1711,7 @@ private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncRespon }).exceptionally(ex -> { Throwable cause = ex.getCause(); // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause); } asyncResponse.resume(new RestException(cause)); @@ -1880,7 +1880,7 @@ protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subNa } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, ex); } @@ -1924,7 +1924,7 @@ private CompletableFuture internalSkipAllMessagesForNonPartitionedTopicAsy } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, ex); } @@ -1988,7 +1988,7 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, }) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to skip {} messages {} {}", clientAppId(), numMessages, topicName, subName, ex); } @@ -2058,7 +2058,7 @@ protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResp ) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, ex); } @@ -2125,7 +2125,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy }) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to expire messages for all subscription up to {} on {}", clientAppId(), expireTimeInSeconds, topicName, ex); } @@ -2332,7 +2332,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su })).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, ex); } @@ -2342,7 +2342,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, ex); } @@ -2473,7 +2473,7 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to update subscription {} from topic {}", clientAppId(), subName, topicName, ex); } @@ -2513,7 +2513,7 @@ protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, S }) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to analyze back log of subscription {} from topic {}", clientAppId(), subName, topicName, ex); } @@ -2598,7 +2598,7 @@ protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, St } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to update subscription {} from topic {}", clientAppId(), subName, topicName, ex); } @@ -2684,7 +2684,7 @@ protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", clientAppId(), topicName, subName, messageId, ex.getCause()); } @@ -2693,7 +2693,7 @@ protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", clientAppId(), topicName, subName, messageId, ex.getCause()); } @@ -3329,7 +3329,7 @@ protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse, } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get backlog size for topic {}", clientAppId(), topicName, ex); } @@ -3337,7 +3337,7 @@ protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse, return null; })).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate global namespace ownership " + "to get backlog size for topic {}", clientAppId(), topicName, ex); } @@ -3903,7 +3903,7 @@ protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3911,7 +3911,7 @@ protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo }) ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -4001,7 +4001,7 @@ protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, St ).exceptionally(ex -> { Throwable cause = FutureUtil.unwrapCompletionException(ex); // If the exception is not redirect exception we need to log it. - if (!isRedirectException(cause)) { + if (!isNot307And404Exception(cause)) { if (cause instanceof RestException) { log.warn("[{}] Failed to expire messages up to {} on {}: {}", clientAppId(), expireTimeInSeconds, topicName, cause.toString()); @@ -4116,7 +4116,7 @@ protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, Str messageId, isExcluded, batchIndex); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}", clientAppId(), topicName, subName, messageId, ex); } @@ -4266,7 +4266,7 @@ protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean au } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -4275,7 +4275,7 @@ protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean au } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate global namespace ownership to trigger compaction on topic {}", clientAppId(), topicName, ex); } @@ -4304,7 +4304,7 @@ protected void internalTriggerCompactionNonPartitionedTopic(AsyncResponse asyncR } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to trigger compaction for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -4340,7 +4340,7 @@ protected void internalTriggerOffload(AsyncResponse asyncResponse, } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to trigger offload for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -4357,7 +4357,7 @@ protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean author asyncResponse.resume(offloadProcessStatus); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to offload status on topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -4634,7 +4634,7 @@ protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean aut }); }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get last messageId {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -5012,7 +5012,7 @@ protected CompletableFuture internalRemoveSubscribeRate(boolean isGlobal) protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) { Throwable cause = thr.getCause(); - if (!isRedirectException(cause) && !isNotFoundException(cause)) { + if (isNot307And404Exception(cause)) { log.error("[{}] Failed to perform {} on topic {}", clientAppId(), methodName, topicName, cause); } @@ -5138,7 +5138,7 @@ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncRespon resultFuture.exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled, topicName, subName, ex); } @@ -5185,7 +5185,7 @@ private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic( } ).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to set replicated subscription status on {} {}", clientAppId(), topicName, subName, ex); } @@ -5286,7 +5286,7 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon } resultFuture.exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(), topicName, subName, ex); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java index 454b8f0fac82c..286366c8b5834 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java @@ -238,7 +238,7 @@ private CompletableFuture validateOwnershipAndOperationAsync(boolean autho protected boolean shouldPrintErrorLog(Throwable ex) { - return !isRedirectException(ex) && !isNotFoundException(ex); + return isNot307And404Exception(ex); } private static final Logger log = LoggerFactory.getLogger(SchemasResourceBase.class); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index d4795393f9b03..a22ad4b242f57 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -132,7 +132,7 @@ public void getInternalStats( }) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -479,7 +479,7 @@ public void getListFromBundle( } asyncResponse.resume(topicList); }).exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, ex); } @@ -488,7 +488,7 @@ public void getListFromBundle( }); } }).exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, ex); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 09aacb87edc42..32667fcf1eb13 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -119,7 +119,7 @@ public void getList( internalGetListAsync(Optional.ofNullable(bundle)) .thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic))) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -150,7 +150,7 @@ public void getPartitionedTopicList( .thenAccept(partitionedTopicList -> asyncResponse.resume( filterSystemTopic(partitionedTopicList, includeSystemTopic))) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -335,7 +335,7 @@ public void createNonPartitionedTopic( internalCreateNonPartitionedTopicAsync(authoritative, properties) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -825,7 +825,7 @@ public void updatePartitionedTopic( asyncResponse.resume(Response.noContent().build()); }) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}][{}] Failed to update partition to {}", clientAppId(), topicName, numPartitions, ex); } @@ -934,7 +934,7 @@ public void getProperties( internalGetPropertiesAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -970,7 +970,7 @@ public void updateProperties( internalUpdatePropertiesAsync(authoritative, properties) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1004,7 +1004,7 @@ public void removeProperties( internalRemovePropertiesAsync(authoritative, key) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to remove key {} in properties on topic {}", clientAppId(), key, topicName, ex); } @@ -1125,7 +1125,7 @@ public void deleteTopic( } else if (isManagedLedgerNotFoundException(t)) { ex = new RestException(Response.Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); - } else if (!isRedirectException(ex) && !isNotFoundException(ex)) { + } else if (isNot307And404Exception(ex)) { log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1209,7 +1209,7 @@ public void getStats( .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get stats for {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1243,7 +1243,7 @@ public void getInternalStats( internalGetInternalStatsAsync(authoritative, metadata) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1892,7 +1892,7 @@ public void peekNthMessage( internalPeekNthMessageAsync(decode(encodedSubName), messagePosition, authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get peek nth message for topic {} subscription {}", clientAppId(), topicName, decode(encodedSubName), ex); } @@ -1934,7 +1934,7 @@ public void examineMessage( internalExamineMessageAsync(initialPosition, messagePosition, authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName, ex); } @@ -1976,7 +1976,7 @@ public void getMessageById( .thenAccept(asyncResponse::resume) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", clientAppId(), ledgerId, entryId, topicName, ex); } @@ -2020,7 +2020,7 @@ public void getMessageIdByTimestamp( } }) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get message ID by timestamp {} from {}", clientAppId(), timestamp, topicName, ex); } @@ -2055,7 +2055,7 @@ public void getBacklog( log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), namespaceName); ex = new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"); - } else if (!isRedirectException(ex) && !isNotFoundException(ex)) { + } else if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get estimated backlog for topic {}", clientAppId(), encodedTopic, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3173,7 +3173,7 @@ public void terminate( internalTerminateAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -3269,7 +3269,7 @@ public void compactionStatus( internalCompactionStatusAsync(authoritative) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get the status of a compaction operation for the topic {}", clientAppId(), topicName, ex); } @@ -3408,7 +3408,7 @@ public void trimTopic( validateTopicName(tenant, namespace, encodedTopic); internalTrimTopic(asyncResponse, authoritative).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex) && !isNotFoundException(ex)) { + if (isNot307And404Exception(ex)) { log.error("[{}] Failed to trim topic {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java index 83ee03b2e4fa6..c2a54987ea2d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java @@ -105,7 +105,7 @@ public void getTransactionInBufferStats(@Suspended final AsyncResponse asyncResp Long.parseLong(leastSigBits)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get transaction state in transaction buffer {}", clientAppId(), topicName, ex); } @@ -143,7 +143,7 @@ public void getTransactionInPendingAckStats(@Suspended final AsyncResponse async Long.parseLong(leastSigBits), subName) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get transaction state in pending ack {}", clientAppId(), topicName, ex); } @@ -181,7 +181,7 @@ public void getTransactionBufferStats(@Suspended final AsyncResponse asyncRespon internalGetTransactionBufferStats(authoritative, lowWaterMarks, segmentStats) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get transaction buffer stats in topic {}", clientAppId(), topicName, ex); } @@ -217,7 +217,7 @@ public void getPendingAckStats(@Suspended final AsyncResponse asyncResponse, internalGetPendingAckStats(authoritative, subName, lowWaterMarks) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get transaction pending ack stats in topic {}", clientAppId(), topicName, ex); } @@ -314,7 +314,7 @@ public void getPendingAckInternalStats(@Suspended final AsyncResponse asyncRespo internalGetPendingAckInternalStats(authoritative, subName, metadata) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get pending ack internal stats {}", clientAppId(), topicName, ex); } @@ -365,7 +365,7 @@ public void getTransactionBufferInternalStats(@Suspended final AsyncResponse asy internalGetTransactionBufferInternalStats(authoritative, metadata) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get transaction buffer internal stats {}", clientAppId(), topicName, ex); }