From a95c175e500147865b5c1d8f3cd3e59b61884d92 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 20 Feb 2025 15:20:42 +0800 Subject: [PATCH] [improve][broker] Remove enableReplicatedSubscriptions config Signed-off-by: Zixuan Liu --- conf/broker.conf | 3 - .../terraform-ansible/templates/broker.conf | 3 - .../pulsar/broker/ServiceConfiguration.java | 5 -- .../persistent/GeoPersistentReplicator.java | 7 +- .../persistent/PersistentSubscription.java | 9 +- .../service/persistent/PersistentTopic.java | 13 +-- ...econnectZKClientPulsarServiceBaseTest.java | 1 - ...licationWithConfigurationSyncTestBase.java | 1 - .../broker/service/NetworkErrorTestBase.java | 1 - .../service/OneWayReplicatorTestBase.java | 1 - .../broker/service/ReplicatorTestBase.java | 2 - .../ReplicatedSubscriptionConfigTest.java | 18 ---- .../broker/transaction/TransactionTest.java | 1 - ...ReplicatedSubscriptionsIsDisabledTest.java | 83 ------------------- 14 files changed, 5 insertions(+), 143 deletions(-) delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/EnableReplicatedSubscriptionsIsDisabledTest.java diff --git a/conf/broker.conf b/conf/broker.conf index f68306ec7b4d7..b9345725280b1 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -661,9 +661,6 @@ delayedDeliveryMaxDelayInMillis=0 # Whether to enable acknowledge of batch local index. acknowledgmentAtBatchIndexLevelEnabled=false -# Enable tracking of replicated subscriptions state across clusters. -enableReplicatedSubscriptions=true - # Frequency of snapshots for replicated subscriptions tracking. replicatedSubscriptionsSnapshotFrequencyMillis=1000 diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index bae55cb69f1ee..b0f4d3983dc40 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -390,9 +390,6 @@ delayedDeliveryTickTimeMillis=1000 # Whether to enable acknowledge of batch local index. acknowledgmentAtBatchIndexLevelEnabled=false -# Enable tracking of replicated subscriptions state across clusters. -enableReplicatedSubscriptions=true - # Frequency of snapshots for replicated subscriptions tracking. replicatedSubscriptionsSnapshotFrequencyMillis=1000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 37ef1cfaf62ae..63d48c2ec2f09 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1426,11 +1426,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING) private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; - @FieldContext( - category = CATEGORY_SERVER, - doc = "Enable tracking of replicated subscriptions state across clusters.") - private boolean enableReplicatedSubscriptions = true; - @FieldContext( category = CATEGORY_SERVER, doc = "Frequency of snapshots for replicated subscriptions tracking.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index cd5b2ba721215..b2d02e823079d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -82,9 +82,6 @@ protected CompletableFuture prepareCreateProducer() { @Override protected boolean replicateEntries(List entries) { boolean atLeastOneMessageSentForReplication = false; - boolean isEnableReplicatedSubscriptions = - brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions(); - try { // This flag is set to true when we skip at least one local message, // in order to skip remaining local messages. @@ -128,9 +125,7 @@ protected boolean replicateEntries(List entries) { } } - if (isEnableReplicatedSubscriptions) { - checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload); - } + checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload); if (msg.isReplicated()) { // Discard messages that were already replicated into this region diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 275d1ae5818b0..d01c98ae7c9ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -206,7 +206,7 @@ public boolean isReplicated() { public boolean setReplicated(boolean replicated) { replicatedControlled = replicated; - if (!replicated || !config.isEnableReplicatedSubscriptions()) { + if (!replicated) { this.replicatedSubscriptionSnapshotCache = null; } else if (this.replicatedSubscriptionSnapshotCache == null) { this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName, @@ -215,12 +215,7 @@ public boolean setReplicated(boolean replicated) { if (this.cursor != null) { if (replicated) { - if (!config.isEnableReplicatedSubscriptions()) { - log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the " - + "configuration enableReplicatedSubscriptions", topicName, subName, replicated); - } else { - return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); - } + return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); } else { return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ed05e47ed38e3..d842a1c6180e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -925,12 +925,6 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { Boolean replicatedSubscriptionState = replicatedSubscriptionStateArg; - if (replicatedSubscriptionState != null && replicatedSubscriptionState - && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { - log.warn("[{}] Replicated Subscription is disabled by broker.", getName()); - replicatedSubscriptionState = false; - } - if (subType == SubType.Key_Shared && !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) { return FutureUtil.failedFuture( @@ -4066,16 +4060,13 @@ public synchronized void checkReplicatedSubscriptionControllerState() { private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) { boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent(); - boolean isEnableReplicatedSubscriptions = - brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions(); boolean replicationEnabled = this.topicPolicies.getReplicationClusters().get().size() > 1; - if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions && replicationEnabled) { + if (shouldBeEnabled && !isCurrentlyEnabled && replicationEnabled) { log.info("[{}] Enabling replicated subscriptions controller", topic); replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this, brokerService.pulsar().getConfiguration().getClusterName())); - } else if (isCurrentlyEnabled && (!shouldBeEnabled || !isEnableReplicatedSubscriptions - || !replicationEnabled)) { + } else if (isCurrentlyEnabled && (!shouldBeEnabled || !replicationEnabled)) { log.info("[{}] Disabled replicated subscriptions controller", topic); replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); replicatedSubscriptionsController = Optional.empty(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java index 787b4d3154e90..7dee692ac6f3c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java @@ -193,7 +193,6 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName, config.setBacklogQuotaCheckIntervalInSeconds(5); config.setDefaultNumberOfNamespaceBundles(1); config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - config.setEnableReplicatedSubscriptions(true); config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); config.setTlsTrustCertsFilePath(caCertPath); config.setTlsCertificateFilePath(brokerCertPath); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java index 1362a046247d8..b4574e79e50a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java @@ -179,7 +179,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName config.setBacklogQuotaCheckIntervalInSeconds(5); config.setDefaultNumberOfNamespaceBundles(1); config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - config.setEnableReplicatedSubscriptions(true); config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); config.setLoadBalancerSheddingEnabled(false); config.setTlsTrustCertsFilePath(caCertPath); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java index 0161a4a63cfc6..565cd8b652df0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java @@ -177,7 +177,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName config.setBacklogQuotaCheckIntervalInSeconds(5); config.setDefaultNumberOfNamespaceBundles(1); config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - config.setEnableReplicatedSubscriptions(true); config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); config.setLoadBalancerSheddingEnabled(false); config.setForceDeleteNamespaceAllowed(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 200c8dd3b3d9f..33e64f6d8ff69 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -266,7 +266,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName config.setBacklogQuotaCheckIntervalInSeconds(5); config.setDefaultNumberOfNamespaceBundles(1); config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - config.setEnableReplicatedSubscriptions(true); config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); config.setLoadBalancerSheddingEnabled(false); config.setForceDeleteNamespaceAllowed(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 33877b681184f..831d8effe8e01 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -342,7 +342,6 @@ public void setConfig2DefaultValue() { public void setConfig4DefaultValue() { setConfigDefaults(config4, cluster4, bkEnsemble4); - config4.setEnableReplicatedSubscriptions(false); } private void setConfigDefaults(ServiceConfiguration config, String clusterName, @@ -373,7 +372,6 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName, config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config.setDefaultNumberOfNamespaceBundles(1); config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - config.setEnableReplicatedSubscriptions(true); config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); config.setLoadManagerClassName(getLoadManagerClassName()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java index 604326203e876..86ba6fb51618c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java @@ -55,7 +55,6 @@ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder p @Test public void createReplicatedSubscription() throws Exception { - this.conf.setEnableReplicatedSubscriptions(true); String topic = BrokerTestUtil.newUniqueName("createReplicatedSubscription"); @Cleanup @@ -77,7 +76,6 @@ public void createReplicatedSubscription() throws Exception { @Test public void upgradeToReplicatedSubscription() throws Exception { - this.conf.setEnableReplicatedSubscriptions(true); String topic = BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscription"); Consumer consumer = pulsarClient.newConsumer(Schema.STRING) @@ -103,7 +101,6 @@ public void upgradeToReplicatedSubscription() throws Exception { @Test public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception { - this.conf.setEnableReplicatedSubscriptions(true); String topic = BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscriptionAfterRestart"); Consumer consumer = pulsarClient.newConsumer(Schema.STRING) @@ -128,19 +125,4 @@ public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception { assertTrue(stats.getSubscriptions().get("sub").isReplicated()); consumer.close(); } - - @Test - public void testDisableReplicatedSubscriptions() throws Exception { - this.conf.setEnableReplicatedSubscriptions(false); - String topic = BrokerTestUtil.newUniqueName("disableReplicatedSubscriptions"); - Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topic(topic) - .subscriptionName("sub") - .replicateSubscriptionState(true) - .subscribe(); - - TopicStats stats = admin.topics().getStats(topic); - assertFalse(stats.getSubscriptions().get("sub").isReplicated()); - consumer.close(); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 35c9048ebb554..b3eea2e696c35 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1582,7 +1582,6 @@ public void testTBRecoverChangeStateError() throws InterruptedException, Timeout ScheduledExecutorService executorServiceRecover = mock(ScheduledExecutorService.class); // Mock serviceConfiguration. ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class); - when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false); when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true); // Mock executorProvider. ExecutorProvider executorProvider = mock(ExecutorProvider.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/EnableReplicatedSubscriptionsIsDisabledTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/EnableReplicatedSubscriptionsIsDisabledTest.java deleted file mode 100644 index d002261cee4a3..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/EnableReplicatedSubscriptionsIsDisabledTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import lombok.Cleanup; -import org.apache.pulsar.broker.service.Subscription; -import org.apache.pulsar.broker.service.Topic; -import org.apache.pulsar.broker.service.persistent.PersistentSubscription; -import org.apache.pulsar.common.naming.TopicName; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -@Test(groups = "broker-api") -public class EnableReplicatedSubscriptionsIsDisabledTest extends ProducerConsumerBase { - - @BeforeClass - @Override - protected void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - } - - @Override - protected void doInitConf() throws Exception { - super.doInitConf(); - conf.setEnableReplicatedSubscriptions(false); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - - @Test - public void testReplicateSubscriptionStateIsEnabled() throws Exception { - String topicName = TopicName.get("my-property/my-ns/testReplicateSubscriptionStateIsEnabled").toString(); - String subName = "my-subscription"; - @Cleanup - Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) - .topic(topicName) - .subscriptionName(subName) - .replicateSubscriptionState(true) - .subscribe(); - CompletableFuture> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topicName); - assertThat(topicIfExists) - .succeedsWithin(3, TimeUnit.SECONDS) - .matches(optionalTopic -> { - assertTrue(optionalTopic.isPresent()); - Topic topicRef = optionalTopic.get(); - Subscription subscription = topicRef.getSubscription(subName); - assertNotNull(subscription); - assertTrue(subscription instanceof PersistentSubscription); - PersistentSubscription persistentSubscription = (PersistentSubscription) subscription; - assertEquals(persistentSubscription.getReplicatedControlled(), Boolean.FALSE); - return true; - }); - } -}