Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[improve][broker] Remove enableReplicatedSubscriptions config #36

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,6 @@ delayedDeliveryMaxDelayInMillis=0
# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false

# Enable tracking of replicated subscriptions state across clusters.
enableReplicatedSubscriptions=true

# Frequency of snapshots for replicated subscriptions tracking.
replicatedSubscriptionsSnapshotFrequencyMillis=1000

Expand Down
3 changes: 0 additions & 3 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,6 @@ delayedDeliveryTickTimeMillis=1000
# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false

# Enable tracking of replicated subscriptions state across clusters.
enableReplicatedSubscriptions=true

# Frequency of snapshots for replicated subscriptions tracking.
replicatedSubscriptionsSnapshotFrequencyMillis=1000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1426,11 +1426,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING)
private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable tracking of replicated subscriptions state across clusters.")
private boolean enableReplicatedSubscriptions = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Frequency of snapshots for replicated subscriptions tracking.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ protected CompletableFuture<Void> prepareCreateProducer() {
@Override
protected boolean replicateEntries(List<Entry> entries) {
boolean atLeastOneMessageSentForReplication = false;
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();

try {
// This flag is set to true when we skip at least one local message,
// in order to skip remaining local messages.
Expand Down Expand Up @@ -128,9 +125,7 @@ protected boolean replicateEntries(List<Entry> entries) {
}
}

if (isEnableReplicatedSubscriptions) {
checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
}
checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);

if (msg.isReplicated()) {
// Discard messages that were already replicated into this region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public boolean isReplicated() {
public boolean setReplicated(boolean replicated) {
replicatedControlled = replicated;

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
if (!replicated) {
this.replicatedSubscriptionSnapshotCache = null;
} else if (this.replicatedSubscriptionSnapshotCache == null) {
this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
Expand All @@ -215,12 +215,7 @@ public boolean setReplicated(boolean replicated) {

if (this.cursor != null) {
if (replicated) {
if (!config.isEnableReplicatedSubscriptions()) {
log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the "
+ "configuration enableReplicatedSubscriptions", topicName, subName, replicated);
} else {
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
} else {
return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,12 +925,6 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
Boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;
if (replicatedSubscriptionState != null && replicatedSubscriptionState
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
replicatedSubscriptionState = false;
}

if (subType == SubType.Key_Shared
&& !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
return FutureUtil.failedFuture(
Expand Down Expand Up @@ -4066,16 +4060,13 @@ public synchronized void checkReplicatedSubscriptionControllerState() {

private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) {
boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
boolean replicationEnabled = this.topicPolicies.getReplicationClusters().get().size() > 1;

if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions && replicationEnabled) {
if (shouldBeEnabled && !isCurrentlyEnabled && replicationEnabled) {
log.info("[{}] Enabling replicated subscriptions controller", topic);
replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
brokerService.pulsar().getConfiguration().getClusterName()));
} else if (isCurrentlyEnabled && (!shouldBeEnabled || !isEnableReplicatedSubscriptions
|| !replicationEnabled)) {
} else if (isCurrentlyEnabled && (!shouldBeEnabled || !replicationEnabled)) {
log.info("[{}] Disabled replicated subscriptions controller", topic);
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
replicatedSubscriptionsController = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName,
config.setBacklogQuotaCheckIntervalInSeconds(5);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setTlsTrustCertsFilePath(caCertPath);
config.setTlsCertificateFilePath(brokerCertPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setBacklogQuotaCheckIntervalInSeconds(5);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadBalancerSheddingEnabled(false);
config.setTlsTrustCertsFilePath(caCertPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setBacklogQuotaCheckIntervalInSeconds(5);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadBalancerSheddingEnabled(false);
config.setForceDeleteNamespaceAllowed(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setBacklogQuotaCheckIntervalInSeconds(5);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadBalancerSheddingEnabled(false);
config.setForceDeleteNamespaceAllowed(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ public void setConfig2DefaultValue() {

public void setConfig4DefaultValue() {
setConfigDefaults(config4, cluster4, bkEnsemble4);
config4.setEnableReplicatedSubscriptions(false);
}

private void setConfigDefaults(ServiceConfiguration config, String clusterName,
Expand Down Expand Up @@ -373,7 +372,6 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName,
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config.setDefaultNumberOfNamespaceBundles(1);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadManagerClassName(getLoadManagerClassName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder p

@Test
public void createReplicatedSubscription() throws Exception {
this.conf.setEnableReplicatedSubscriptions(true);
String topic = BrokerTestUtil.newUniqueName("createReplicatedSubscription");

@Cleanup
Expand All @@ -77,7 +76,6 @@ public void createReplicatedSubscription() throws Exception {

@Test
public void upgradeToReplicatedSubscription() throws Exception {
this.conf.setEnableReplicatedSubscriptions(true);
String topic = BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscription");

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
Expand All @@ -103,7 +101,6 @@ public void upgradeToReplicatedSubscription() throws Exception {

@Test
public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception {
this.conf.setEnableReplicatedSubscriptions(true);
String topic = BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscriptionAfterRestart");

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
Expand All @@ -128,19 +125,4 @@ public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception {
assertTrue(stats.getSubscriptions().get("sub").isReplicated());
consumer.close();
}

@Test
public void testDisableReplicatedSubscriptions() throws Exception {
this.conf.setEnableReplicatedSubscriptions(false);
String topic = BrokerTestUtil.newUniqueName("disableReplicatedSubscriptions");
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.replicateSubscriptionState(true)
.subscribe();

TopicStats stats = admin.topics().getStats(topic);
assertFalse(stats.getSubscriptions().get("sub").isReplicated());
consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,6 @@ public void testTBRecoverChangeStateError() throws InterruptedException, Timeout
ScheduledExecutorService executorServiceRecover = mock(ScheduledExecutorService.class);
// Mock serviceConfiguration.
ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false);
when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true);
// Mock executorProvider.
ExecutorProvider executorProvider = mock(ExecutorProvider.class);
Expand Down

This file was deleted.

Loading