From ed599673c7e60ab5bb02e1fb0615a7ff8e5d6430 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 3 Jan 2024 21:51:26 +0800 Subject: [PATCH] [fix] [broker] Update topic policies as much as possible when some ex was thrown (#21810) ### Motivation After the topic policies update, there are many components will be updated one by one, even if the config of components has not been modified. There are the 11 components that need update: - `7` rate limiters(`publish`, `dispatch topic-level`, `dispatch subscription-level`, `dispatch resourceGroup-level`, `subscribe API`, `replication`, `shadow topic replication`) - update ManagedLedger configs(`retention`, `offloader`) - start/stop replication - start/stop compaction - start/stop deduplication Once a component update fails, the following update will be skipped. It would cause a confusing thing: you want to set a retention policy, but it will be skipped due to the `update subscribe rate limiter` failure (you did not edit the `subscribe rate limitation policy`) Since none of the components in the above list have any additional dependencies for individual updates, ensuring success as much as possible is appropriate. ### Modifications - Update topic policies as much as possible even if some component updates fail, all component updates are still in the same thread, and they still update one by one, just throw the error later. - Rename `updatePublishDispatcher` to `updatePublishRateLimiter` --- .../pulsar/broker/service/AbstractTopic.java | 2 +- .../pulsar/broker/service/BrokerService.java | 2 +- .../nonpersistent/NonPersistentTopic.java | 2 +- .../service/persistent/PersistentTopic.java | 117 ++++++++++-------- .../broker/admin/TopicPoliciesTest.java | 51 ++++++++ .../apache/pulsar/common/util/FutureUtil.java | 6 + 6 files changed, 123 insertions(+), 57 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index ba35c8a280e9e..fcadab228e572 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1229,7 +1229,7 @@ protected boolean isExceedMaximumMessageSize(int size, PublishContext publishCon /** * update topic publish dispatcher for this topic. */ - public void updatePublishDispatcher() { + public void updatePublishRateLimiter() { PublishRate publishRate = topicPolicies.getPublishRate().get(); if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) { log.info("Enabling publish rate limiting {} on topic {}", publishRate, getName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4077762bb0640..b3b6e778596f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2706,7 +2706,7 @@ private void updateMaxPublishRatePerTopicInMessages() { forEachTopic(topic -> { if (topic instanceof AbstractTopic) { ((AbstractTopic) topic).updateBrokerPublishRate(); - ((AbstractTopic) topic).updatePublishDispatcher(); + ((AbstractTopic) topic).updatePublishRateLimiter(); } })); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 00cf3a6583b9a..139507ba10205 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -178,7 +178,7 @@ public CompletableFuture initialize() { isEncryptionRequired = policies.encryption_required; isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; } - updatePublishDispatcher(); + updatePublishRateLimiter(); updateResourceGroupLimiter(policies); return updateClusterMigrated(); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cfb369dd59eb1..e3cfafff57f2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -357,7 +357,7 @@ public CompletableFuture initialize() { .thenAcceptAsync(optPolicies -> { if (!optPolicies.isPresent()) { isEncryptionRequired = false; - updatePublishDispatcher(); + updatePublishRateLimiter(); updateResourceGroupLimiter(new Policies()); initializeDispatchRateLimiterIfNeeded(); updateSubscribeRateLimiter(); @@ -372,7 +372,7 @@ public CompletableFuture initialize() { updateSubscribeRateLimiter(); - updatePublishDispatcher(); + updatePublishRateLimiter(); updateResourceGroupLimiter(policies); @@ -3086,39 +3086,60 @@ public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { return CompletableFuture.completedFuture(null); } + // Update props. + // The component "EntryFilters" is update in the method "updateTopicPolicyByNamespacePolicy(data)". + // see more detail: https://github.com/apache/pulsar/pull/19364. updateTopicPolicyByNamespacePolicy(data); checkReplicatedSubscriptionControllerState(); isEncryptionRequired = data.encryption_required; - isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; - updateDispatchRateLimiter(); - - updateSubscribeRateLimiter(); + // Apply policies for components. + List> applyPolicyTasks = applyUpdatedTopicPolicies(); + applyPolicyTasks.add(applyUpdatedNamespacePolicies(data)); + return FutureUtil.waitForAll(applyPolicyTasks) + .thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic)) + .exceptionally(ex -> { + log.error("[{}] update namespace polices : {} error", this.getName(), data, ex); + throw FutureUtil.wrapToCompletionException(ex); + }); + } - updatePublishDispatcher(); + private CompletableFuture applyUpdatedNamespacePolicies(Policies namespaceLevelPolicies) { + return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter(namespaceLevelPolicies)); + } - updateResourceGroupLimiter(data); + private List> applyUpdatedTopicPolicies() { + List> applyPoliciesFutureList = new ArrayList<>(); - List> producerCheckFutures = new ArrayList<>(producers.size()); - producers.values().forEach(producer -> producerCheckFutures.add( + // Client permission check. + subscriptions.forEach((subName, sub) -> { + sub.getConsumers().forEach(consumer -> applyPoliciesFutureList.add(consumer.checkPermissionsAsync())); + }); + producers.values().forEach(producer -> applyPoliciesFutureList.add( producer.checkPermissionsAsync().thenRun(producer::checkEncryption))); + // Check message expiry. + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkMessageExpiry())); - return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> { - return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> { - replicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - checkMessageExpiry(); - CompletableFuture replicationFuture = checkReplicationAndRetryOnFailure(); - CompletableFuture dedupFuture = checkDeduplicationStatus(); - CompletableFuture persistentPoliciesFuture = checkPersistencePolicies(); - return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture, - preCreateSubscriptionForCompactionIfNeeded()); - }); - }).exceptionally(ex -> { - log.error("[{}] update namespace polices : {} error", this.getName(), data, ex); - throw FutureUtil.wrapToCompletionException(ex); - }); + // Update rate limiters. + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateDispatchRateLimiter())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscribeRateLimiter())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updatePublishRateLimiter())); + + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscriptionsDispatcherRateLimiter())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread( + () -> replicators.forEach((name, replicator) -> replicator.updateRateLimiter()))); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread( + () -> shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()))); + + // Other components. + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkReplicationAndRetryOnFailure())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkDeduplicationStatus())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkPersistencePolicies())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread( + () -> preCreateSubscriptionForCompactionIfNeeded())); + + return applyPoliciesFutureList; } /** @@ -3778,42 +3799,30 @@ public void onUpdate(TopicPolicies policies) { if (policies == null) { return; } + // Update props. + // The component "EntryFilters" is update in the method "updateTopicPolicy(data)". + // see more detail: https://github.com/apache/pulsar/pull/19364. updateTopicPolicy(policies); shadowTopics = policies.getShadowTopics(); - updateDispatchRateLimiter(); checkReplicatedSubscriptionControllerState(); - updateSubscriptionsDispatcherRateLimiter().thenRun(() -> { - updatePublishDispatcher(); - updateSubscribeRateLimiter(); - replicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter()); - checkMessageExpiry(); - }) - .thenCompose(__ -> checkReplicationAndRetryOnFailure()) - .thenCompose(__ -> checkDeduplicationStatus()) - .thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded()) - .thenCompose(__ -> checkPersistencePolicies()) - .thenAccept(__ -> log.info("[{}] Policies updated successfully", topic)) - .exceptionally(e -> { - Throwable t = FutureUtil.unwrapCompletionException(e); - log.error("[{}] update topic policy error: {}", topic, t.getMessage(), t); - return null; - }); + + // Apply policies for components(not contains the specified policies which only defined in namespace policies). + FutureUtil.waitForAll(applyUpdatedTopicPolicies()) + .thenAccept(__ -> log.info("[{}] topic-level policies updated successfully", topic)) + .exceptionally(e -> { + Throwable t = FutureUtil.unwrapCompletionException(e); + log.error("[{}] update topic-level policy error: {}", topic, t.getMessage(), t); + return null; + }); } - private CompletableFuture updateSubscriptionsDispatcherRateLimiter() { - List> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size()); + private void updateSubscriptionsDispatcherRateLimiter() { subscriptions.forEach((subName, sub) -> { - List> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size()); - sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync())); - subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> { - Dispatcher dispatcher = sub.getDispatcher(); - if (dispatcher != null) { - dispatcher.updateRateLimiter(); - } - })); + Dispatcher dispatcher = sub.getDispatcher(); + if (dispatcher != null) { + dispatcher.updateRateLimiter(); + } }); - return FutureUtil.waitForAll(subscriptionCheckFutures); } protected CompletableFuture initTopicPolicy() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 036d4354a5f1b..023b77a3dc088 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -42,6 +42,8 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ConfigHelper; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -50,6 +52,7 @@ import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -83,8 +86,11 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -3157,4 +3163,49 @@ public void testProduceChangesWithEncryptionRequired() throws Exception { }); } + @Test + public void testUpdateRetentionWithPartialFailure() throws Exception { + String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp"); + admin.topics().createNonPartitionedTopic(tpName); + + // Load topic up. + admin.topics().getInternalStats(tpName); + + // Inject an error that makes dispatch rate update fail. + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get(); + ConcurrentOpenHashMap subscriptions = + WhiteboxImpl.getInternalState(persistentTopic, "subscriptions"); + PersistentSubscription mockedSubscription = Mockito.mock(PersistentSubscription.class); + Mockito.when(mockedSubscription.getDispatcher()).thenThrow(new RuntimeException("Mocked error: getDispatcher")); + subscriptions.put("mockedSubscription", mockedSubscription); + + // Update namespace-level retention policies. + RetentionPolicies retentionPolicies1 = new RetentionPolicies(1, 1); + admin.namespaces().setRetentionAsync(myNamespace, retentionPolicies1); + + // Verify: update retention will be success even if other component update throws exception. + Awaitility.await().untilAsserted(() -> { + ManagedLedgerImpl ML = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + assertEquals(ML.getConfig().getRetentionSizeInMB(), 1); + assertEquals(ML.getConfig().getRetentionTimeMillis(), 1 * 60 * 1000); + }); + + // Update topic-level retention policies. + RetentionPolicies retentionPolicies2 = new RetentionPolicies(2, 2); + admin.topics().setRetentionAsync(tpName, retentionPolicies2); + + // Verify: update retention will be success even if other component update throws exception. + Awaitility.await().untilAsserted(() -> { + ManagedLedgerImpl ML = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + assertEquals(ML.getConfig().getRetentionSizeInMB(), 2); + assertEquals(ML.getConfig().getRetentionTimeMillis(), 2 * 60 * 1000); + }); + + // Cleanup. + subscriptions.clear(); + admin.namespaces().removeRetention(myNamespace); + admin.topics().delete(tpName, false); + } + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 2b082b4a7899b..6f62589853593 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.util; +import com.google.common.util.concurrent.MoreExecutors; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -55,6 +56,11 @@ public static CompletableFuture waitForAll(Collection runWithCurrentThread(Runnable runnable) { + return CompletableFuture.runAsync( + () -> runnable.run(), MoreExecutors.directExecutor()); + } + public static CompletableFuture> waitForAll(Stream>> futures) { return futures.reduce(CompletableFuture.completedFuture(new ArrayList<>()), (pre, curr) -> pre.thenCompose(preV -> curr.thenApply(currV -> {