diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index ba35c8a280e9e..fcadab228e572 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1229,7 +1229,7 @@ protected boolean isExceedMaximumMessageSize(int size, PublishContext publishCon /** * update topic publish dispatcher for this topic. */ - public void updatePublishDispatcher() { + public void updatePublishRateLimiter() { PublishRate publishRate = topicPolicies.getPublishRate().get(); if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) { log.info("Enabling publish rate limiting {} on topic {}", publishRate, getName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4077762bb0640..b3b6e778596f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2706,7 +2706,7 @@ private void updateMaxPublishRatePerTopicInMessages() { forEachTopic(topic -> { if (topic instanceof AbstractTopic) { ((AbstractTopic) topic).updateBrokerPublishRate(); - ((AbstractTopic) topic).updatePublishDispatcher(); + ((AbstractTopic) topic).updatePublishRateLimiter(); } })); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 00cf3a6583b9a..139507ba10205 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -178,7 +178,7 @@ public CompletableFuture initialize() { isEncryptionRequired = policies.encryption_required; isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; } - updatePublishDispatcher(); + updatePublishRateLimiter(); updateResourceGroupLimiter(policies); return updateClusterMigrated(); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cfb369dd59eb1..e3cfafff57f2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -357,7 +357,7 @@ public CompletableFuture initialize() { .thenAcceptAsync(optPolicies -> { if (!optPolicies.isPresent()) { isEncryptionRequired = false; - updatePublishDispatcher(); + updatePublishRateLimiter(); updateResourceGroupLimiter(new Policies()); initializeDispatchRateLimiterIfNeeded(); updateSubscribeRateLimiter(); @@ -372,7 +372,7 @@ public CompletableFuture initialize() { updateSubscribeRateLimiter(); - updatePublishDispatcher(); + updatePublishRateLimiter(); updateResourceGroupLimiter(policies); @@ -3086,39 +3086,60 @@ public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { return CompletableFuture.completedFuture(null); } + // Update props. + // The component "EntryFilters" is update in the method "updateTopicPolicyByNamespacePolicy(data)". + // see more detail: https://github.com/apache/pulsar/pull/19364. updateTopicPolicyByNamespacePolicy(data); checkReplicatedSubscriptionControllerState(); isEncryptionRequired = data.encryption_required; - isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; - updateDispatchRateLimiter(); - - updateSubscribeRateLimiter(); + // Apply policies for components. + List> applyPolicyTasks = applyUpdatedTopicPolicies(); + applyPolicyTasks.add(applyUpdatedNamespacePolicies(data)); + return FutureUtil.waitForAll(applyPolicyTasks) + .thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic)) + .exceptionally(ex -> { + log.error("[{}] update namespace polices : {} error", this.getName(), data, ex); + throw FutureUtil.wrapToCompletionException(ex); + }); + } - updatePublishDispatcher(); + private CompletableFuture applyUpdatedNamespacePolicies(Policies namespaceLevelPolicies) { + return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter(namespaceLevelPolicies)); + } - updateResourceGroupLimiter(data); + private List> applyUpdatedTopicPolicies() { + List> applyPoliciesFutureList = new ArrayList<>(); - List> producerCheckFutures = new ArrayList<>(producers.size()); - producers.values().forEach(producer -> producerCheckFutures.add( + // Client permission check. + subscriptions.forEach((subName, sub) -> { + sub.getConsumers().forEach(consumer -> applyPoliciesFutureList.add(consumer.checkPermissionsAsync())); + }); + producers.values().forEach(producer -> applyPoliciesFutureList.add( producer.checkPermissionsAsync().thenRun(producer::checkEncryption))); + // Check message expiry. + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkMessageExpiry())); - return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> { - return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> { - replicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - checkMessageExpiry(); - CompletableFuture replicationFuture = checkReplicationAndRetryOnFailure(); - CompletableFuture dedupFuture = checkDeduplicationStatus(); - CompletableFuture persistentPoliciesFuture = checkPersistencePolicies(); - return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture, - preCreateSubscriptionForCompactionIfNeeded()); - }); - }).exceptionally(ex -> { - log.error("[{}] update namespace polices : {} error", this.getName(), data, ex); - throw FutureUtil.wrapToCompletionException(ex); - }); + // Update rate limiters. + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateDispatchRateLimiter())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscribeRateLimiter())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updatePublishRateLimiter())); + + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscriptionsDispatcherRateLimiter())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread( + () -> replicators.forEach((name, replicator) -> replicator.updateRateLimiter()))); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread( + () -> shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()))); + + // Other components. + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkReplicationAndRetryOnFailure())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkDeduplicationStatus())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkPersistencePolicies())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread( + () -> preCreateSubscriptionForCompactionIfNeeded())); + + return applyPoliciesFutureList; } /** @@ -3778,42 +3799,30 @@ public void onUpdate(TopicPolicies policies) { if (policies == null) { return; } + // Update props. + // The component "EntryFilters" is update in the method "updateTopicPolicy(data)". + // see more detail: https://github.com/apache/pulsar/pull/19364. updateTopicPolicy(policies); shadowTopics = policies.getShadowTopics(); - updateDispatchRateLimiter(); checkReplicatedSubscriptionControllerState(); - updateSubscriptionsDispatcherRateLimiter().thenRun(() -> { - updatePublishDispatcher(); - updateSubscribeRateLimiter(); - replicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - checkMessageExpiry(); - }) - .thenCompose(__ -> checkReplicationAndRetryOnFailure()) - .thenCompose(__ -> checkDeduplicationStatus()) - .thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded()) - .thenCompose(__ -> checkPersistencePolicies()) - .thenAccept(__ -> log.info("[{}] Policies updated successfully", topic)) - .exceptionally(e -> { - Throwable t = FutureUtil.unwrapCompletionException(e); - log.error("[{}] update topic policy error: {}", topic, t.getMessage(), t); - return null; - }); + + // Apply policies for components(not contains the specified policies which only defined in namespace policies). + FutureUtil.waitForAll(applyUpdatedTopicPolicies()) + .thenAccept(__ -> log.info("[{}] topic-level policies updated successfully", topic)) + .exceptionally(e -> { + Throwable t = FutureUtil.unwrapCompletionException(e); + log.error("[{}] update topic-level policy error: {}", topic, t.getMessage(), t); + return null; + }); } - private CompletableFuture updateSubscriptionsDispatcherRateLimiter() { - List> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size()); + private void updateSubscriptionsDispatcherRateLimiter() { subscriptions.forEach((subName, sub) -> { - List> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size()); - sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync())); - subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> { - Dispatcher dispatcher = sub.getDispatcher(); - if (dispatcher != null) { - dispatcher.updateRateLimiter(); - } - })); + Dispatcher dispatcher = sub.getDispatcher(); + if (dispatcher != null) { + dispatcher.updateRateLimiter(); + } }); - return FutureUtil.waitForAll(subscriptionCheckFutures); } protected CompletableFuture initTopicPolicy() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 036d4354a5f1b..023b77a3dc088 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -42,6 +42,8 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ConfigHelper; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -50,6 +52,7 @@ import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -83,8 +86,11 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -3157,4 +3163,49 @@ public void testProduceChangesWithEncryptionRequired() throws Exception { }); } + @Test + public void testUpdateRetentionWithPartialFailure() throws Exception { + String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp"); + admin.topics().createNonPartitionedTopic(tpName); + + // Load topic up. + admin.topics().getInternalStats(tpName); + + // Inject an error that makes dispatch rate update fail. + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get(); + ConcurrentOpenHashMap subscriptions = + WhiteboxImpl.getInternalState(persistentTopic, "subscriptions"); + PersistentSubscription mockedSubscription = Mockito.mock(PersistentSubscription.class); + Mockito.when(mockedSubscription.getDispatcher()).thenThrow(new RuntimeException("Mocked error: getDispatcher")); + subscriptions.put("mockedSubscription", mockedSubscription); + + // Update namespace-level retention policies. + RetentionPolicies retentionPolicies1 = new RetentionPolicies(1, 1); + admin.namespaces().setRetentionAsync(myNamespace, retentionPolicies1); + + // Verify: update retention will be success even if other component update throws exception. + Awaitility.await().untilAsserted(() -> { + ManagedLedgerImpl ML = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + assertEquals(ML.getConfig().getRetentionSizeInMB(), 1); + assertEquals(ML.getConfig().getRetentionTimeMillis(), 1 * 60 * 1000); + }); + + // Update topic-level retention policies. + RetentionPolicies retentionPolicies2 = new RetentionPolicies(2, 2); + admin.topics().setRetentionAsync(tpName, retentionPolicies2); + + // Verify: update retention will be success even if other component update throws exception. + Awaitility.await().untilAsserted(() -> { + ManagedLedgerImpl ML = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + assertEquals(ML.getConfig().getRetentionSizeInMB(), 2); + assertEquals(ML.getConfig().getRetentionTimeMillis(), 2 * 60 * 1000); + }); + + // Cleanup. + subscriptions.clear(); + admin.namespaces().removeRetention(myNamespace); + admin.topics().delete(tpName, false); + } + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 2b082b4a7899b..6f62589853593 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.util; +import com.google.common.util.concurrent.MoreExecutors; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -55,6 +56,11 @@ public static CompletableFuture waitForAll(Collection runWithCurrentThread(Runnable runnable) { + return CompletableFuture.runAsync( + () -> runnable.run(), MoreExecutors.directExecutor()); + } + public static CompletableFuture> waitForAll(Stream>> futures) { return futures.reduce(CompletableFuture.completedFuture(new ArrayList<>()), (pre, curr) -> pre.thenCompose(preV -> curr.thenApply(currV -> {