From 742b85a78515137edfb34d77a1db0453f7e601a0 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sat, 14 Dec 2024 01:17:08 +0800 Subject: [PATCH 1/2] Verify is read only before revoke permissions on topic --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9a306f6b4fff7..f9d96758fc663 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -298,6 +298,7 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str } validateAccessForTenantCf + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; From 46998400c57c1847de3f9b7a8a8d49cc3ffd3ad0 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 16 Dec 2024 16:25:46 +0800 Subject: [PATCH 2/2] Add tests --- .../admin/impl/PersistentTopicsBase.java | 4 +-- .../broker/admin/PersistentTopicsTest.java | 30 +++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f9d96758fc663..1300cd3449c27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -289,7 +289,8 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges CompletableFuture validateAccessForTenantCf = - validateAdminAccessForTenantAsync(namespaceName.getTenant()); + validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()); var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); if (checkIfTopicExists) { @@ -298,7 +299,6 @@ protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, Str } validateAccessForTenantCf - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index cca5049ed50eb..302948903442c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1030,6 +1030,36 @@ public void testRevokePartitionedTopic() { } } + @Test + public void testRevokePartitionedTopicWithReadonlyPolicies() throws Exception { + final String partitionedTopicName = "testRevokePartitionedTopicWithReadonlyPolicies-topic"; + final int numPartitions = 5; + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic( + response, testTenant, testNamespace, partitionedTopicName, numPartitions, true); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + String role = "role"; + Set expectActions = new HashSet<>(); + expectActions.add(AuthAction.produce); + response = mock(AsyncResponse.class); + responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role, + expectActions); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + response = mock(AsyncResponse.class); + doReturn(CompletableFuture.failedFuture( + new RestException(Response.Status.FORBIDDEN, "Broker is forbidden to do read-write operations")) + ).when(persistentTopics).validatePoliciesReadOnlyAccessAsync(); + persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(exceptionCaptor.capture()); + Assert.assertEquals(exceptionCaptor.getValue().getResponse().getStatus(), + Response.Status.FORBIDDEN.getStatusCode()); + } + @Test public void testTriggerCompactionTopic() { final String partitionTopicName = "test-part";