Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[improve] [broker] Do not print an Error log when responding to HTTP-404 when calling Admin API and the topic does not exist. #21995

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5012,9 +5012,7 @@ protected CompletableFuture<Void> internalRemoveSubscribeRate(boolean isGlobal)

protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
Throwable cause = thr.getCause();
if (!(cause instanceof WebApplicationException) || !(
((WebApplicationException) cause).getResponse().getStatus() == 307
|| ((WebApplicationException) cause).getResponse().getStatus() == 404)) {
if (!isRedirectException(cause) && !isNotFoundException(cause)) {
log.error("[{}] Failed to perform {} on topic {}",
clientAppId(), methodName, topicName, cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void getList(
internalGetListAsync(Optional.ofNullable(bundle))
.thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -150,7 +150,7 @@ public void getPartitionedTopicList(
.thenAccept(partitionedTopicList -> asyncResponse.resume(
filterSystemTopic(partitionedTopicList, includeSystemTopic)))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -335,7 +335,7 @@ public void createNonPartitionedTopic(
internalCreateNonPartitionedTopicAsync(authoritative, properties)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -825,7 +825,7 @@ public void updatePartitionedTopic(
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}][{}] Failed to update partition to {}",
clientAppId(), topicName, numPartitions, ex);
}
Expand Down Expand Up @@ -934,7 +934,7 @@ public void getProperties(
internalGetPropertiesAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -970,7 +970,7 @@ public void updateProperties(
internalUpdatePropertiesAsync(authoritative, properties)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -1004,7 +1004,7 @@ public void removeProperties(
internalRemovePropertiesAsync(authoritative, key)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to remove key {} in properties on topic {}",
clientAppId(), key, topicName, ex);
}
Expand Down Expand Up @@ -1125,7 +1125,7 @@ public void deleteTopic(
} else if (isManagedLedgerNotFoundException(t)) {
ex = new RestException(Response.Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
} else if (!isRedirectException(ex)) {
} else if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -1209,7 +1209,7 @@ public void getStats(
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to get stats for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -1243,7 +1243,7 @@ public void getInternalStats(
internalGetInternalStatsAsync(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -1892,7 +1892,7 @@ public void peekNthMessage(
internalPeekNthMessageAsync(decode(encodedSubName), messagePosition, authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to get peek nth message for topic {} subscription {}", clientAppId(),
topicName, decode(encodedSubName), ex);
}
Expand Down Expand Up @@ -1934,7 +1934,7 @@ public void examineMessage(
internalExamineMessageAsync(initialPosition, messagePosition, authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName,
ex);
}
Expand Down Expand Up @@ -1976,7 +1976,7 @@ public void getMessageById(
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName, ex);
}
Expand Down Expand Up @@ -2020,7 +2020,7 @@ public void getMessageIdByTimestamp(
}
})
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to get message ID by timestamp {} from {}",
clientAppId(), timestamp, topicName, ex);
}
Expand Down Expand Up @@ -2055,7 +2055,7 @@ public void getBacklog(
log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(),
namespaceName);
ex = new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
} else if (!isRedirectException(ex)) {
} else if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to get estimated backlog for topic {}", clientAppId(), encodedTopic, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -3173,7 +3173,7 @@ public void terminate(
internalTerminateAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -3269,7 +3269,7 @@ public void compactionStatus(
internalCompactionStatusAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to get the status of a compaction operation for the topic {}",
clientAppId(), topicName, ex);
}
Expand Down Expand Up @@ -3408,7 +3408,7 @@ public void trimTopic(
validateTopicName(tenant, namespace, encodedTopic);
internalTrimTopic(asyncResponse, authoritative).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
if (!isRedirectException(ex) && !isNotFoundException(ex)) {
log.error("[{}] Failed to trim topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,43 +969,49 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
}
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
getTopicPoliciesBypassSystemTopic(topicName);
return topicPoliciesFuture.exceptionally(ex -> {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
final String errorInfo = String.format("Topic creation encountered an exception by initialize"
+ " topic policies service. topic_name=%s error_message=%s", topicName, rc.getMessage());
log.error(errorInfo, rc);
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil
.failedFuture(new BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
}).thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent() && createIfMissing) {
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ "but the returned topic is empty", topicName);
return getTopic(topicName, createIfMissing, properties);
}
return CompletableFuture.completedFuture(optionalTopic);
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
.thenCompose(exists -> {
if (!exists && !createIfMissing) {
return CompletableFuture.completedFuture(Optional.empty());
}
return getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
final String errorInfo = String.format("Topic creation encountered an exception by initialize"
+ " topic policies service. topic_name=%s error_message=%s", topicName,
rc.getMessage());
log.error(errorInfo, rc);
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
}).thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent() && createIfMissing) {
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ "but the returned topic is empty", topicName);
return getTopic(topicName, createIfMissing, properties);
}
return CompletableFuture.completedFuture(optionalTopic);
});
});
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3389,4 +3389,32 @@ private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Except
// cleanup.
admin.namespaces().deleteNamespace(ns);
}

@Test
private void testGetStatsIfPartitionNotExists() throws Exception {
// create topic.
final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
admin.topics().createPartitionedTopic(partitionedTp, 1);
TopicName partition0 = TopicName.get(partitionedTp).getPartition(0);
boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
assertTrue(topicExists1);
// Verify topics-stats works.
TopicStats topicStats = admin.topics().getStats(partition0.toString());
assertNotNull(topicStats);

// Delete partition and call topic-stats again.
admin.topics().delete(partition0.toString());
boolean topicExists2 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
assertFalse(topicExists2);
// Verify: respond 404.
try {
admin.topics().getStats(partition0.toString());
fail("Should respond 404 after the partition was deleted");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("Topic partitions were not yet created"));
}

// cleanup.
admin.topics().deletePartitionedTopic(partitionedTp);
}
}
Loading