Skip to content

Commit

Permalink
[improve][admin] Add clusters check when set replication clusters (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored and srinath-ctds committed Dec 20, 2023
1 parent 819de96 commit 7ae661e
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,9 @@ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<S
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenApply(__ -> {
checkNotNull(clusterIds, "ClusterIds should not be null");
if (CollectionUtils.isEmpty(clusterIds)) {
throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty");
}
if (!namespaceName.isGlobal() && !(clusterIds.size() == 1
&& clusterIds.get(0).equals(pulsar().getConfiguration().getClusterName()))) {
throw new RestException(Status.PRECONDITION_FAILED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3349,6 +3349,9 @@ protected CompletableFuture<Void> internalSetReplicationClusters(List<String> cl
return validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
if (CollectionUtils.isEmpty(clusterIds)) {
throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty");
}
Set<String> replicationClusters = Sets.newHashSet(clusterIds);
if (replicationClusters.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,19 +1440,10 @@ public void testClusterIsReadyBeforeCreateTopic() throws Exception {
admin.namespaces().createNamespace(defaultTenant + "/ns2");
// By default the cluster will configure as configuration file. So the create topic operation
// will never throw exception except there is no cluster.
admin.namespaces().setNamespaceReplicationClusters(defaultTenant + "/ns2", new HashSet<String>());
admin.namespaces().setNamespaceReplicationClusters(defaultTenant + "/ns2", Sets.newHashSet(configClusterName));

try {
admin.topics().createPartitionedTopic(persistentPartitionedTopicName, partitions);
Assert.fail("should have failed due to Namespace does not have any clusters configured");
} catch (PulsarAdminException.PreconditionFailedException ignored) {
}

try {
admin.topics().createPartitionedTopic(NonPersistentPartitionedTopicName, partitions);
Assert.fail("should have failed due to Namespace does not have any clusters configured");
} catch (PulsarAdminException.PreconditionFailedException ignored) {
}
admin.topics().createPartitionedTopic(persistentPartitionedTopicName, partitions);
admin.topics().createPartitionedTopic(NonPersistentPartitionedTopicName, partitions);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -1294,6 +1295,14 @@ public void testForceDeleteNamespace() throws Exception {
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
}

@Test
public void testSetNamespaceReplicationCluters() throws Exception {
String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace");
admin.namespaces().createNamespace(namespace, 100);
assertThrows(PulsarAdminException.PreconditionFailedException.class,
() -> admin.namespaces().setNamespaceReplicationClusters(namespace, Set.of()));
}

@Test
public void testForceDeleteNamespaceNotAllowed() throws Exception {
assertFalse(pulsar.getConfiguration().isForceDeleteNamespaceAllowed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -2990,6 +2991,10 @@ public void testReplicatorClusterApi() throws Exception {
admin.topics().removeReplicationClusters(topic);
Awaitility.await().untilAsserted(()
-> assertNull(admin.topics().getReplicationClusters(topic, false)));

assertThrows(PulsarAdminException.PreconditionFailedException.class, () -> admin.topics().setReplicationClusters(topic, List.of()));
assertThrows(PulsarAdminException.PreconditionFailedException.class, () -> admin.topics().setReplicationClusters(topic, null));

}

@Test
Expand Down

0 comments on commit 7ae661e

Please # to comment.