Skip to content

Commit

Permalink
add implemetation
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Sep 29, 2024
1 parent 6877b7c commit d18afa6
Show file tree
Hide file tree
Showing 12 changed files with 405 additions and 144 deletions.
144 changes: 0 additions & 144 deletions pip/pip-383.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -223,6 +226,16 @@ CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespac
CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson);

default CompletableFuture<Void> grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("grantPermissionAsync is not supported by the Authorization")));
}

default CompletableFuture<Void> revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("revokePermissionAsync is not supported by the Authorization")));
}


/**
* Revoke authorization-action permission on a topic to the given client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.util.concurrent.TimeUnit.SECONDS;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,6 +33,8 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -181,6 +184,14 @@ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<Aut
return provider.grantPermissionAsync(topicName, actions, role, authDataJson);
}

public CompletableFuture<Void> grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
return provider.grantPermissionAsync(options);
}

public CompletableFuture<Void> revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
return provider.revokePermissionAsync(options);
}

/**
* Revoke authorization-action permission on a topic to the given client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,6 +33,8 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -251,6 +254,68 @@ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<Aut
});
}

public CompletableFuture<Void> grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
TopicName topicName = TopicName.get(options.get(0).getTopic());
return pulsarResources.getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), policies -> {
options.stream().forEach(o -> {
final String topicUri = TopicName.get(o.getTopic()).toString();
policies.auth_policies.getTopicAuthentication()
.computeIfAbsent(topicUri, __ -> new HashMap<>())
.put(o.getRole(), o.getActions());
});
return policies;
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to grant permissions for {}", options);
} else {
log.info("Successfully granted access for {}", options);
}
});
});
}

@Override
public CompletableFuture<Void> revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
TopicName topicName = TopicName.get(options.get(0).getTopic());
return pulsarResources.getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), policies -> {
options.stream().forEach(o -> {
final String topicUri = TopicName.get(o.getTopic()).toString();
policies.auth_policies.getTopicAuthentication()
.computeIfPresent(topicUri, (topicNameUri, roles) -> {
roles.remove(o.getRole());
if (roles.isEmpty()) {
return null;
}
return roles;
});
});
return policies;
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to revoke permissions for {}", options, ex);
} else {
log.info("Successfully revoke permissions for {}", options);
}
});
});
}

@Override
public CompletableFuture<Void> revokePermissionAsync(TopicName topicName, String role) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,4 +924,15 @@ protected void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
"The bucket must be specified for namespace offload.");
}
}

protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
return pulsar().getNamespaceService().checkTopicExists(topicName)
.thenAccept(info -> {
boolean exists = info.isExists();
info.recycle();
if (!exists) {
throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
}
});
}
}
Loading

0 comments on commit d18afa6

Please # to comment.