Skip to content

Commit

Permalink
[fix][broker] Skip topic auto-creation for ExtensibleLoadManager inte…
Browse files Browse the repository at this point in the history
…rnal topics (apache#21729)
  • Loading branch information
heesung-sn authored Dec 15, 2023
1 parent c4cff0a commit 88df040
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ public void close() throws PulsarServerException {
}
}

private boolean isInternalTopic(String topic) {
public static boolean isInternalTopic(String topic) {
return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
Expand Down Expand Up @@ -3288,10 +3288,10 @@ private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName
return CompletableFuture.completedFuture(false);
}

// ServiceUnitStateChannelImpl.TOPIC expects to be a non-partitioned-topic now.
// ExtensibleLoadManagerImpl.internal topics expects to be non-partitioned-topics now.
// We don't allow the auto-creation here.
// ServiceUnitStateChannelImpl.start() is responsible to create the topic.
if (ServiceUnitStateChannelImpl.TOPIC.equals(topicName.toString())) {
// ExtensibleLoadManagerImpl.start() is responsible to create the internal system topics.
if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
return CompletableFuture.completedFuture(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,27 @@


import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -526,4 +535,58 @@ public void testDynamicConfigurationTopicAutoCreationPartitionedWhenDefaultMoreT
}
}

@Test
public void testExtensibleLoadManagerImplInternalTopicAutoCreations()
throws PulsarAdminException, PulsarClientException {
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setDefaultNumPartitions(3);
pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(5);
final String namespaceName = NamespaceName.SYSTEM_NAMESPACE.toString();
TenantInfoImpl tenantInfo = new TenantInfoImpl();
tenantInfo.setAllowedClusters(Set.of(configClusterName));
admin.tenants().createTenant("pulsar", tenantInfo);
admin.namespaces().createNamespace(namespaceName);
admin.topics().createNonPartitionedTopic(ServiceUnitStateChannelImpl.TOPIC);
admin.topics().createNonPartitionedTopic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC);
admin.topics().createNonPartitionedTopic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);

// clear the topics to test the auto creation of non-persistent topics.
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
pulsar.getBrokerService().getTopics();
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> oldTopics = new ConcurrentOpenHashMap<>();
topics.forEach((key, val) -> oldTopics.put(key, val));
topics.clear();

// The created persistent topic correctly can be found by
// pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
Producer producer = pulsarClient.newProducer().topic(ServiceUnitStateChannelImpl.TOPIC).create();

// The created non-persistent topics cannot be found, as we did topics.clear()
try {
pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC).create();
Assert.fail("Create should have failed.");
} catch (PulsarClientException.TopicDoesNotExistException e) {
// expected
}
try {
pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC).create();
Assert.fail("Create should have failed.");
} catch (PulsarClientException.TopicDoesNotExistException e) {
// expected
}

oldTopics.forEach((key, val) -> topics.put(key, val));

Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName);
assertEquals(partitionedTopicList.size(), 0);
});

producer.close();
admin.namespaces().deleteNamespace(namespaceName);
admin.tenants().deleteTenant("pulsar");

}

}

0 comments on commit 88df040

Please # to comment.