Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Adds PubsubAdmin #69

Merged
merged 14 commits into from
Jul 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions spring-cloud-gcp-pubsub/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
Expand Down Expand Up @@ -33,15 +34,17 @@
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-core</artifactId>
</dependency>

<!-- Tests -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-core</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright 2017 original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gcp.pubsub;

import java.io.IOException;
import java.util.List;

import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.PagedResponseWrappers;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.common.collect.Lists;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;

import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
import org.springframework.util.Assert;

/**
* Pub/Sub admin utility that creates new topics and subscriptions on Google Cloud Pub/Sub.
*
* @author João André Martins
*/
public class PubsubAdmin {

private final String projectId;

private final TopicAdminClient topicAdminClient;

private final SubscriptionAdminClient subscriptionAdminClient;

/** Default inspired in the subscription creation web UI. */
private int defaultAckDeadline = 10;

/**
* This constructor instantiates TopicAdminClient and SubscriptionAdminClient with all their
* defaults and the provided credentials provider.
*/
public PubsubAdmin(GcpProjectIdProvider projectIdProvider,
CredentialsProvider credentialsProvider) throws IOException {
this(projectIdProvider,
TopicAdminClient.create(
TopicAdminSettings.defaultBuilder()
.setCredentialsProvider(credentialsProvider)
.build()),
SubscriptionAdminClient.create(
SubscriptionAdminSettings.defaultBuilder()
.setCredentialsProvider(credentialsProvider)
.build()));
}

public PubsubAdmin(GcpProjectIdProvider projectIdProvider, TopicAdminClient topicAdminClient,
SubscriptionAdminClient subscriptionAdminClient) {
Assert.notNull(projectIdProvider, "The project ID provider can't be null.");
Assert.notNull(topicAdminClient, "The topic administration client can't be null");
Assert.notNull(subscriptionAdminClient,
"The subscription administration client can't be null");

this.projectId = projectIdProvider.getProjectId();
Assert.hasText(this.projectId, "The project ID can't be null or empty.");
this.topicAdminClient = topicAdminClient;
this.subscriptionAdminClient = subscriptionAdminClient;
}

/**
* Create a new topic on Google Cloud Pub/Sub.
*
* @param topicName the name for the new topic
* @return the created topic
*/
public Topic createTopic(String topicName) {
Assert.hasText(topicName, "No topic name was specified.");

return this.topicAdminClient.createTopic(TopicName.create(this.projectId, topicName));
}

/**
* Delete a topic from Google Cloud Pub/Sub.
*
* @param topicName the name of the topic to be deleted
*/
public void deleteTopic(String topicName) {
Assert.hasText(topicName, "No topic name was specified.");

this.topicAdminClient.deleteTopic(TopicName.create(this.projectId, topicName));
}

/**
* Return every topic in a project.
*
* <p>If there are multiple pages, they will all be merged into the same result.
*/
public List<Topic> listTopics() {
PagedResponseWrappers.ListTopicsPagedResponse topicListPage =
this.topicAdminClient.listTopics(ProjectName.create(this.projectId));

return Lists.newArrayList(topicListPage.iterateAll());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the local variable topicListPage seems to imply that it only contains a single page of results. Are you sure that iterateAll() actually return all results rather than a single page? If so, maybe rename topicListPage to just topicsList.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, my comment about the variable name still applies I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make sense for a local variable containing the result of iterateAll() to be called topicsList, not the topicAdminClient.listTopics() result. Also, the object type is ListTopicsPagedResponse, which also hints to it being a page, rather than the whole thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"paged response" does not necessarily imply "page". You can also call it verbosely listTopicsPagedResponse. In any case, it's so minor, it's not worth discussing further. :)

}

/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @return the created subscription
*/
public Subscription createSubscription(String subscriptionName, String topicName) {
return createSubscription(subscriptionName, topicName, null, null);
}

/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param ackDeadline deadline in seconds before a message is resent. If not provided, set to
* default of 10 seconds
* @return the created subscription
*/
public Subscription createSubscription(String subscriptionName, String topicName,
Integer ackDeadline) {
return createSubscription(subscriptionName, topicName, ackDeadline, null);
}

/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param pushEndpoint URL of the service receiving the push messages. If not provided, uses
* message pulling by default
* @return the created subscription
*/
public Subscription createSubscription(String subscriptionName, String topicName,
String pushEndpoint) {
return createSubscription(subscriptionName, topicName, null, pushEndpoint);
}

/**
* Create a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param ackDeadline deadline in seconds before a message is resent. If not provided, set to
* default of 10 seconds
* @param pushEndpoint URL of the service receiving the push messages. If not provided, uses
* message pulling by default
* @return the created subscription
*/
public Subscription createSubscription(String subscriptionName, String topicName,
Integer ackDeadline, String pushEndpoint) {
Assert.hasText(subscriptionName, "No subscription name was specified.");
Assert.hasText(topicName, "No topic name was specified.");

int finalAckDeadline = this.defaultAckDeadline;
if (ackDeadline != null) {
Assert.isTrue(ackDeadline >= 0,
"The acknowledgement deadline value can't be negative.");
finalAckDeadline = ackDeadline;
}

PushConfig.Builder pushConfigBuilder = PushConfig.newBuilder();
if (pushEndpoint != null) {
pushConfigBuilder.setPushEndpoint(pushEndpoint);
}

return this.subscriptionAdminClient.createSubscription(
SubscriptionName.create(this.projectId, subscriptionName),
TopicName.create(this.projectId, topicName),
pushConfigBuilder.build(),
finalAckDeadline);
}

/**
* Delete a subscription from Google Cloud Pub/Sub.
*
* @param subscriptionName
*/
public void deleteSubscription(String subscriptionName) {
Assert.hasText(subscriptionName, "No subscription name was specified");

this.subscriptionAdminClient.deleteSubscription(
SubscriptionName.create(this.projectId, subscriptionName));
}

/**
* Return every subscription in a project.
*
* <p>If there are multiple pages, they will all be merged into the same result.
*/
public List<Subscription> listSubscriptions() {
PagedResponseWrappers.ListSubscriptionsPagedResponse subscriptionsPage =
this.subscriptionAdminClient.listSubscriptions(ProjectName.create(this.projectId));

return Lists.newArrayList(subscriptionsPage.iterateAll());
}

/**
* @return the default acknowledgement deadline value in seconds
*/
public int getDefaultAckDeadline() {
return this.defaultAckDeadline;
}

/**
* Set the default acknowledgement deadline value.
*
* @param defaultAckDeadline default acknowledgement deadline value in seconds
*/
public void setDefaultAckDeadline(int defaultAckDeadline) {
Assert.isTrue(defaultAckDeadline >= 0,
"The acknowledgement deadline value can't be negative.");

this.defaultAckDeadline = defaultAckDeadline;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017 original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gcp.pubsub;

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

/**
* @author João André Martins
*/
@RunWith(MockitoJUnitRunner.class)
public class PubsubAdminTests {

@Mock
private TopicAdminClient mockTopicAdminClient;

@Mock
private SubscriptionAdminClient mockSubscriptionAdminClient;

@Test(expected = IllegalArgumentException.class)
public void testNewPubsubAdmin_nullProjectProvider() {
new PubsubAdmin(null, this.mockTopicAdminClient, this.mockSubscriptionAdminClient);
}

@Test(expected = IllegalArgumentException.class)
public void testNewPubsubAdmin_nullTopicAdminClient() {
new PubsubAdmin(() -> "test-project", null, this.mockSubscriptionAdminClient);
}

@Test(expected = IllegalArgumentException.class)
public void testNewPubsubAdmin_nullSubscriptionAdminClient() {
new PubsubAdmin(() -> "test-project", this.mockTopicAdminClient, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package org.springframework.cloud.gcp.pubsub.autoconfig;

import java.io.IOException;
import java.util.concurrent.Executors;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.FixedExecutorProvider;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;

import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -31,6 +34,8 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
import org.springframework.cloud.gcp.core.autoconfig.GcpContextAutoConfiguration;
import org.springframework.cloud.gcp.pubsub.PubsubAdmin;
import org.springframework.cloud.gcp.pubsub.core.PubsubException;
import org.springframework.cloud.gcp.pubsub.core.PubsubTemplate;
import org.springframework.cloud.gcp.pubsub.support.DefaultPublisherFactory;
import org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory;
Expand Down Expand Up @@ -112,4 +117,42 @@ public PublisherFactory defaultPublisherFactory(GcpProjectIdProvider projectIdPr
return new DefaultPublisherFactory(projectIdProvider, subscriberProvider, channelProvider,
credentialsProvider);
}

@Bean
@ConditionalOnMissingBean
public PubsubAdmin pubsubAdmin(GcpProjectIdProvider projectIdProvider,
TopicAdminClient topicAdminClient,
SubscriptionAdminClient subscriptionAdminClient) {
return new PubsubAdmin(projectIdProvider, topicAdminClient, subscriptionAdminClient);
}

@Bean
@ConditionalOnMissingBean
public TopicAdminClient topicAdminClient(CredentialsProvider credentialsProvider) {
try {
return TopicAdminClient.create(
TopicAdminSettings.defaultBuilder()
.setCredentialsProvider(credentialsProvider)
.build());
}
catch (IOException ioe) {
throw new PubsubException("An error occurred while creating TopicAdminClient.", ioe);
}
}

@Bean
@ConditionalOnMissingBean
public SubscriptionAdminClient subscriptionAdminClient(
CredentialsProvider credentialsProvider) {
try {
return SubscriptionAdminClient.create(
SubscriptionAdminSettings.defaultBuilder()
.setCredentialsProvider(credentialsProvider)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we have an additional PubsubAdmin ctor based on the CredentialsProvider to instantiate TopicAdminClient and SubscriptionAdminClient internally similar way?

Just in case for flexibility to let end-user to avoid this boilerplate code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I also like that kind of simplicity, but we would then need to have that kind of low level resource handling in the constructor you were opposed to earlier. i.e., we would need to instantiate TopicAdminClient and SubscriptionAdminClient in the constructor, and they can throw IOException.

But if we're OK with it, I can add it. I personally don't mind the IOException in the constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking to the private SubscriptionAdminSettings(Builder settingsBuilder) throws IOException { it is really unclear why an IOException is there from the day first at all.

So, having all these builders I don't think we really get access to low-level resource.
Anyway what you have here via dependency injection is still " low level resource handling" if you are interested in my opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only concern left.
At least give some answer, please.
Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, didn't mean to not reply here, just went grab lunch in the meantime. :)

IOException is thrown because of the channel creation, and maybe other cases, although I'm not sure about the exact circumstances it can happen (internet endpoint is unreachable?). Because of that, I think you do have access to low level resources, like a TCP connection, when you're creating a Subscription.

I really like this way because you still have to handle low level resources, but you do that away from the constructor. If you need to handle the IOException, you do it away from the constructor. And it should still be easy enough for users to just call new PubsubAdmin(projectIdProvider, TopicAdminClient.create(), SubscriptionAdminClient.create()).

Although, to be honest, I'm not opposed to offer that constructor on the side too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Fine with me. Looks like we get IOException from the CredentialsProvider.getCredentials() when we open a file on the matter.

No more discussion on the matter! I can live with this. 😄

.build());
}
catch (IOException ioe) {
throw new PubsubException("An error occurred while creating SubscriptionAdminClient.",
ioe);
}
}
}