From 5068e1b7bbeea191f078ef85e9a45673b4abf158 Mon Sep 17 00:00:00 2001 From: Garrett Jones Date: Tue, 10 Nov 2015 11:07:00 -0800 Subject: [PATCH 1/5] Generated Pub/Sub client classes and unit tests --- gcloud-java-pubsub/pom.xml | 6 + .../gcloud/pubsub/spi/PublisherApi.java | 498 +++++++++++++ .../gcloud/pubsub/spi/SubscriberApi.java | 663 ++++++++++++++++++ .../gcloud/pubsub/spi/LocalPublisherImpl.java | 138 ++++ .../gcloud/pubsub/spi/PublisherApiTest.java | 175 +++++ 5 files changed, 1480 insertions(+) create mode 100644 gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java create mode 100644 gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java create mode 100644 gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/LocalPublisherImpl.java create mode 100644 gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java diff --git a/gcloud-java-pubsub/pom.xml b/gcloud-java-pubsub/pom.xml index 3bcdd9d68f12..df9debe4a030 100644 --- a/gcloud-java-pubsub/pom.xml +++ b/gcloud-java-pubsub/pom.xml @@ -32,6 +32,12 @@ auto-value 1.1 + + junit + junit + 4.12 + test + diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java new file mode 100644 index 000000000000..3c239c3923e4 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java @@ -0,0 +1,498 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * EDITING INSTRUCTIONS + * This file was generated from the file google/pubsub/v1/pubsub.proto, + * and updates to that file get reflected here through a regular refresh process. + * However, manual additions are allowed because the refresh process performs + * a 3-way merge in order to preserve those manual additions. In order to not + * break the refresh process, only certain types of modifications are + * allowed. + * + * Allowed modifications - currently there is only one type allowed: + * 1. New methods (these should be added to the end of the class) + * + * Happy editing! + */ +package com.google.gcloud.pubsub.spi; + +import com.google.protobuf.Empty; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; + +import io.gapi.gax.grpc.ApiCallable; +import io.gapi.gax.grpc.PageDescriptor; +import io.gapi.gax.grpc.ServiceApiSettings; +import io.gapi.gax.internal.ApiUtils; +import io.gapi.gax.protobuf.PathTemplate; +import io.grpc.Channel; +import io.grpc.ManagedChannel; + +import java.io.IOException; +import java.util.List; + +// Manually-added imports: add custom (non-generated) imports after this point. + + + +// AUTO-GENERATED DOCUMENTATION AND SERVICE - see instructions at the top of the file for editing. +/** + * The service that an application uses to manipulate topics, and to send + * messages to a topic. + */ +@javax.annotation.Generated("by the veneer generator") +public class PublisherApi implements AutoCloseable { + + // ========= + // Constants + // ========= + + /** + * The default address of the service. + */ + public static final String SERVICE_ADDRESS = "pubsub-experimental.googleapis.com"; + + /** + * The default port of the service. + */ + public static final int DEFAULT_SERVICE_PORT = 443; + + + public static final ApiCallable + CREATE_TOPIC = ApiCallable.create(PublisherGrpc.METHOD_CREATE_TOPIC); + public static final ApiCallable + PUBLISH = ApiCallable.create(PublisherGrpc.METHOD_PUBLISH); + public static final ApiCallable + GET_TOPIC = ApiCallable.create(PublisherGrpc.METHOD_GET_TOPIC); + public static final ApiCallable + LIST_TOPICS = ApiCallable.create(PublisherGrpc.METHOD_LIST_TOPICS); + public static final ApiCallable + LIST_TOPIC_SUBSCRIPTIONS = ApiCallable.create(PublisherGrpc.METHOD_LIST_TOPIC_SUBSCRIPTIONS); + public static final ApiCallable + DELETE_TOPIC = ApiCallable.create(PublisherGrpc.METHOD_DELETE_TOPIC); + + + + + private static PageDescriptor LIST_TOPICS_PAGE_DESC = + new PageDescriptor() { + @Override public Object emptyToken() { + return ""; + } + @Override public ListTopicsRequest injectToken( + ListTopicsRequest payload, Object token) { + return ListTopicsRequest + .newBuilder(payload) + .setPageToken((String) token) + .build(); + } + @Override + public Object extractNextToken(ListTopicsResponse payload) { + return payload.getNextPageToken(); + } + @Override + public Iterable extractResources(ListTopicsResponse payload) { + return payload.getTopicsList(); + } + }; + private static PageDescriptor LIST_TOPIC_SUBSCRIPTIONS_PAGE_DESC = + new PageDescriptor() { + @Override public Object emptyToken() { + return ""; + } + @Override public ListTopicSubscriptionsRequest injectToken( + ListTopicSubscriptionsRequest payload, Object token) { + return ListTopicSubscriptionsRequest + .newBuilder(payload) + .setPageToken((String) token) + .build(); + } + @Override + public Object extractNextToken(ListTopicSubscriptionsResponse payload) { + return payload.getNextPageToken(); + } + @Override + public Iterable extractResources(ListTopicSubscriptionsResponse payload) { + return payload.getSubscriptionsList(); + } + }; + + private static String ALL_SCOPES[] = { + "https://www.googleapis.com/auth/pubsub" + }; + + public static final PathTemplate PROJECT_PATH_TEMPLATE = + PathTemplate.create("/projects/{project}"); + public static final PathTemplate TOPIC_PATH_TEMPLATE = + PathTemplate.create("/projects/{project}/topics/{topic}"); + + // ======== + // Members + // ======== + + private final ManagedChannel channel; + private final ServiceApiSettings settings; + + // =============== + // Factory Methods + // =============== + + /** + * Constructs an instance of PublisherApi with default settings. + */ + public static PublisherApi create() throws IOException { + return create(new ServiceApiSettings()); + } + + /** + * Constructs an instance of PublisherApi, using the given settings. The channels are created based + * on the settings passed in, or defaults for any settings that are not set. + */ + public static PublisherApi create(ServiceApiSettings settings) throws IOException { + return new PublisherApi(settings); + } + + private PublisherApi(ServiceApiSettings settings) throws IOException { + ServiceApiSettings internalSettings = ApiUtils.settingsWithChannels(settings, + SERVICE_ADDRESS, DEFAULT_SERVICE_PORT, ALL_SCOPES); + this.settings = internalSettings; + this.channel = internalSettings.getChannel(); + } + + // ============================== + // Resource Name Helper Functions + // ============================== + + public static final String createProject(String project) { + return PROJECT_PATH_TEMPLATE.instantiate( + "project", project); + } + + public static final String createTopic(String project, String topic) { + return TOPIC_PATH_TEMPLATE.instantiate( + "project", project,"topic", topic); + } + + + // ======== + // Getters + // ======== + + public Channel getChannel() { + return channel; + } + + + // ============= + // Service Calls + // ============= + + // ----- createTopic ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates the given topic with the given name. + * + * @param name The name of the topic. It must have the format + * `"projects/{project}/topics/{topic}"`. `{topic}` must start with a letter, + * and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`), + * underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent + * signs (`%`). It must be between 3 and 255 characters in length, and it + * must not start with `"goog"`. + */ + public Topic createTopic(String name) { + Topic request = + Topic.newBuilder() + .setName(name) + .build(); + + return createTopic(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates the given topic with the given name. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Topic createTopic(Topic request) { + return createTopicCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates the given topic with the given name. + */ + public ApiCallable createTopicCallable() { + return ApiUtils.prepareIdempotentCallable(CREATE_TOPIC, settings).bind(channel); + } + + // ----- publish ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Adds one or more messages to the topic. Returns NOT_FOUND if the topic does + * not exist. The message payload must not be empty; it must contain either a + * non-empty data field, or at least one attribute. + * + * @param topic The messages in the request will be published on this topic. + * @param messages The messages to publish. + */ + public PublishResponse publish(String topic, List messages) { + PublishRequest request = + PublishRequest.newBuilder() + .setTopic(topic) + .addAllMessages(messages) + .build(); + + return publish(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Adds one or more messages to the topic. Returns NOT_FOUND if the topic does + * not exist. The message payload must not be empty; it must contain either a + * non-empty data field, or at least one attribute. + * + * @param request The request object containing all of the parameters for the API call. + */ + public PublishResponse publish(PublishRequest request) { + return publishCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Adds one or more messages to the topic. Returns NOT_FOUND if the topic does + * not exist. The message payload must not be empty; it must contain either a + * non-empty data field, or at least one attribute. + */ + public ApiCallable publishCallable() { + return PUBLISH.bind(channel); + } + + // ----- getTopic ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration of a topic. + * + * @param topic The name of the topic to get. + */ + public Topic getTopic(String topic) { + GetTopicRequest request = + GetTopicRequest.newBuilder() + .setTopic(topic) + .build(); + + return getTopic(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration of a topic. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Topic getTopic(GetTopicRequest request) { + return getTopicCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration of a topic. + */ + public ApiCallable getTopicCallable() { + return ApiUtils.prepareIdempotentCallable(GET_TOPIC, settings).bind(channel); + } + + // ----- listTopics ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching topics. + */ + public Iterable listTopics(String project) { + ListTopicsRequest request = + ListTopicsRequest.newBuilder() + .setProject(project) + .build(); + return listTopics(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching topics. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Iterable listTopics(ListTopicsRequest request) { + return listTopicsStreamingCallable() + .iterableResponseStreamCall(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching topics. + */ + public ApiCallable listTopicsStreamingCallable() { + return listTopicsCallable().pageStreaming(LIST_TOPICS_PAGE_DESC); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching topics. + */ + public ApiCallable listTopicsCallable() { + return ApiUtils.prepareIdempotentCallable(LIST_TOPICS, settings).bind(channel); + } + + // ----- listTopicSubscriptions ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists the name of the subscriptions for this topic. + */ + public Iterable listTopicSubscriptions(String topic) { + ListTopicSubscriptionsRequest request = + ListTopicSubscriptionsRequest.newBuilder() + .setTopic(topic) + .build(); + return listTopicSubscriptions(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists the name of the subscriptions for this topic. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Iterable listTopicSubscriptions(ListTopicSubscriptionsRequest request) { + return listTopicSubscriptionsStreamingCallable() + .iterableResponseStreamCall(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists the name of the subscriptions for this topic. + */ + public ApiCallable listTopicSubscriptionsStreamingCallable() { + return listTopicSubscriptionsCallable().pageStreaming(LIST_TOPIC_SUBSCRIPTIONS_PAGE_DESC); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists the name of the subscriptions for this topic. + */ + public ApiCallable listTopicSubscriptionsCallable() { + return ApiUtils.prepareIdempotentCallable(LIST_TOPIC_SUBSCRIPTIONS, settings).bind(channel); + } + + // ----- deleteTopic ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes the topic with the given name. Returns NOT_FOUND if the topic does + * not exist. After a topic is deleted, a new topic may be created with the + * same name; this is an entirely new topic with none of the old + * configuration or subscriptions. Existing subscriptions to this topic are + * not deleted, but their `topic` field is set to `_deleted-topic_`. + * + * @param topic Name of the topic to delete. + */ + public void deleteTopic(String topic) { + DeleteTopicRequest request = + DeleteTopicRequest.newBuilder() + .setTopic(topic) + .build(); + + deleteTopic(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes the topic with the given name. Returns NOT_FOUND if the topic does + * not exist. After a topic is deleted, a new topic may be created with the + * same name; this is an entirely new topic with none of the old + * configuration or subscriptions. Existing subscriptions to this topic are + * not deleted, but their `topic` field is set to `_deleted-topic_`. + * + * @param request The request object containing all of the parameters for the API call. + */ + public void deleteTopic(DeleteTopicRequest request) { + deleteTopicCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes the topic with the given name. Returns NOT_FOUND if the topic does + * not exist. After a topic is deleted, a new topic may be created with the + * same name; this is an entirely new topic with none of the old + * configuration or subscriptions. Existing subscriptions to this topic are + * not deleted, but their `topic` field is set to `_deleted-topic_`. + */ + public ApiCallable deleteTopicCallable() { + return ApiUtils.prepareIdempotentCallable(DELETE_TOPIC, settings).bind(channel); + } + + + // ======== + // Cleanup + // ======== + + /** + * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately + * cancelled. + */ + @Override public void close() { + // Manually-added shutdown code + + // Auto-generated shutdown code + channel.shutdown(); + + // Manually-added shutdown code + } + + + // ======== + // Manually-added methods: add custom (non-generated) methods after this point. + // ======== + +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java new file mode 100644 index 000000000000..6afe0da72d51 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java @@ -0,0 +1,663 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * EDITING INSTRUCTIONS + * This file was generated from the file google/pubsub/v1/pubsub.proto, + * and updates to that file get reflected here through a regular refresh process. + * However, manual additions are allowed because the refresh process performs + * a 3-way merge in order to preserve those manual additions. In order to not + * break the refresh process, only certain types of modifications are + * allowed. + * + * Allowed modifications - currently there is only one type allowed: + * 1. New methods (these should be added to the end of the class) + * + * Happy editing! + */ +package com.google.gcloud.pubsub.spi; + +import com.google.protobuf.Empty; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.DeleteSubscriptionRequest; +import com.google.pubsub.v1.GetSubscriptionRequest; +import com.google.pubsub.v1.ListSubscriptionsRequest; +import com.google.pubsub.v1.ListSubscriptionsResponse; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import com.google.pubsub.v1.ModifyPushConfigRequest; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.SubscriberGrpc; +import com.google.pubsub.v1.Subscription; + +import io.gapi.gax.grpc.ApiCallable; +import io.gapi.gax.grpc.PageDescriptor; +import io.gapi.gax.grpc.ServiceApiSettings; +import io.gapi.gax.internal.ApiUtils; +import io.gapi.gax.protobuf.PathTemplate; +import io.grpc.Channel; +import io.grpc.ManagedChannel; + +import java.io.IOException; +import java.util.List; + +// Manually-added imports: add custom (non-generated) imports after this point. + + + +// AUTO-GENERATED DOCUMENTATION AND SERVICE - see instructions at the top of the file for editing. +/** + * The service that an application uses to manipulate subscriptions and to + * consume messages from a subscription via the Pull method. + */ +@javax.annotation.Generated("by the veneer generator") +public class SubscriberApi implements AutoCloseable { + + // ========= + // Constants + // ========= + + /** + * The default address of the service. + */ + public static final String SERVICE_ADDRESS = "pubsub-experimental.googleapis.com"; + + /** + * The default port of the service. + */ + public static final int DEFAULT_SERVICE_PORT = 443; + + + public static final ApiCallable + CREATE_SUBSCRIPTION = ApiCallable.create(SubscriberGrpc.METHOD_CREATE_SUBSCRIPTION); + public static final ApiCallable + GET_SUBSCRIPTION = ApiCallable.create(SubscriberGrpc.METHOD_GET_SUBSCRIPTION); + public static final ApiCallable + LIST_SUBSCRIPTIONS = ApiCallable.create(SubscriberGrpc.METHOD_LIST_SUBSCRIPTIONS); + public static final ApiCallable + DELETE_SUBSCRIPTION = ApiCallable.create(SubscriberGrpc.METHOD_DELETE_SUBSCRIPTION); + public static final ApiCallable + MODIFY_ACK_DEADLINE = ApiCallable.create(SubscriberGrpc.METHOD_MODIFY_ACK_DEADLINE); + public static final ApiCallable + ACKNOWLEDGE = ApiCallable.create(SubscriberGrpc.METHOD_ACKNOWLEDGE); + public static final ApiCallable + PULL = ApiCallable.create(SubscriberGrpc.METHOD_PULL); + public static final ApiCallable + MODIFY_PUSH_CONFIG = ApiCallable.create(SubscriberGrpc.METHOD_MODIFY_PUSH_CONFIG); + + + + private static PageDescriptor LIST_SUBSCRIPTIONS_PAGE_DESC = + new PageDescriptor() { + @Override public Object emptyToken() { + return ""; + } + @Override public ListSubscriptionsRequest injectToken( + ListSubscriptionsRequest payload, Object token) { + return ListSubscriptionsRequest + .newBuilder(payload) + .setPageToken((String) token) + .build(); + } + @Override + public Object extractNextToken(ListSubscriptionsResponse payload) { + return payload.getNextPageToken(); + } + @Override + public Iterable extractResources(ListSubscriptionsResponse payload) { + return payload.getSubscriptionsList(); + } + }; + + + + + + private static String ALL_SCOPES[] = { + "https://www.googleapis.com/auth/pubsub" + }; + + public static final PathTemplate PROJECT_PATH_TEMPLATE = + PathTemplate.create("/projects/{project}"); + public static final PathTemplate SUBSCRIPTION_PATH_TEMPLATE = + PathTemplate.create("/projects/{project}/subscriptions/{subscription}"); + + // ======== + // Members + // ======== + + private final ManagedChannel channel; + private final ServiceApiSettings settings; + + // =============== + // Factory Methods + // =============== + + /** + * Constructs an instance of SubscriberApi with default settings. + */ + public static SubscriberApi create() throws IOException { + return create(new ServiceApiSettings()); + } + + /** + * Constructs an instance of SubscriberApi, using the given settings. The channels are created based + * on the settings passed in, or defaults for any settings that are not set. + */ + public static SubscriberApi create(ServiceApiSettings settings) throws IOException { + return new SubscriberApi(settings); + } + + private SubscriberApi(ServiceApiSettings settings) throws IOException { + ServiceApiSettings internalSettings = ApiUtils.settingsWithChannels(settings, + SERVICE_ADDRESS, DEFAULT_SERVICE_PORT, ALL_SCOPES); + this.settings = internalSettings; + this.channel = internalSettings.getChannel(); + } + + // ============================== + // Resource Name Helper Functions + // ============================== + + public static final String createProject(String project) { + return PROJECT_PATH_TEMPLATE.instantiate( + "project", project); + } + + public static final String createSubscription(String project, String subscription) { + return SUBSCRIPTION_PATH_TEMPLATE.instantiate( + "project", project,"subscription", subscription); + } + + + // ======== + // Getters + // ======== + + public Channel getChannel() { + return channel; + } + + + // ============= + // Service Calls + // ============= + + // ----- createSubscription ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates a subscription to a given topic for a given subscriber. + * If the subscription already exists, returns ALREADY_EXISTS. + * If the corresponding topic doesn't exist, returns NOT_FOUND. + * + * If the name is not provided in the request, the server will assign a random + * name for this subscription on the same project as the topic. + * + * @param name The name of the subscription. It must have the format + * `"projects/{project}/subscriptions/{subscription}"`. `{subscription}` must + * start with a letter, and contain only letters (`[A-Za-z]`), numbers + * (`[0-9]`), dashes (`-`), underscores (`_`), periods (`.`), tildes (`~`), + * plus (`+`) or percent signs (`%`). It must be between 3 and 255 characters + * in length, and it must not start with `"goog"`. + * @param topic The name of the topic from which this subscription is receiving messages. + * The value of this field will be `_deleted-topic_` if the topic has been + * deleted. + * @param pushConfig If push delivery is used with this subscription, this field is + * used to configure it. An empty pushConfig signifies that the subscriber + * will pull and ack messages using API methods. + * @param ackDeadlineSeconds This value is the maximum time after a subscriber receives a message + * before the subscriber should acknowledge the message. After message + * delivery but before the ack deadline expires and before the message is + * acknowledged, it is an outstanding message and will not be delivered + * again during that time (on a best-effort basis). + * + * For pull delivery this value is used as the initial value for the ack + * deadline. To override this value for a given message, call + * ModifyAckDeadline with the corresponding ack_id. + * + * For push delivery, this value is also used to set the request timeout for + * the call to the push endpoint. + * + * If the subscriber never acknowledges the message, the Pub/Sub + * system will eventually redeliver the message. + * + * If this parameter is not set, the default value of 10 seconds is used. + */ + public Subscription createSubscription(String name, String topic, PushConfig pushConfig, int ackDeadlineSeconds) { + Subscription request = + Subscription.newBuilder() + .setName(name) + .setTopic(topic) + .setPushConfig(pushConfig) + .setAckDeadlineSeconds(ackDeadlineSeconds) + .build(); + + return createSubscription(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates a subscription to a given topic for a given subscriber. + * If the subscription already exists, returns ALREADY_EXISTS. + * If the corresponding topic doesn't exist, returns NOT_FOUND. + * + * If the name is not provided in the request, the server will assign a random + * name for this subscription on the same project as the topic. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Subscription createSubscription(Subscription request) { + return createSubscriptionCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates a subscription to a given topic for a given subscriber. + * If the subscription already exists, returns ALREADY_EXISTS. + * If the corresponding topic doesn't exist, returns NOT_FOUND. + * + * If the name is not provided in the request, the server will assign a random + * name for this subscription on the same project as the topic. + */ + public ApiCallable createSubscriptionCallable() { + return ApiUtils.prepareIdempotentCallable(CREATE_SUBSCRIPTION, settings).bind(channel); + } + + // ----- getSubscription ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration details of a subscription. + * + * @param subscription The name of the subscription to get. + */ + public Subscription getSubscription(String subscription) { + GetSubscriptionRequest request = + GetSubscriptionRequest.newBuilder() + .setSubscription(subscription) + .build(); + + return getSubscription(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration details of a subscription. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Subscription getSubscription(GetSubscriptionRequest request) { + return getSubscriptionCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration details of a subscription. + */ + public ApiCallable getSubscriptionCallable() { + return ApiUtils.prepareIdempotentCallable(GET_SUBSCRIPTION, settings).bind(channel); + } + + // ----- listSubscriptions ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching subscriptions. + */ + public Iterable listSubscriptions(String project) { + ListSubscriptionsRequest request = + ListSubscriptionsRequest.newBuilder() + .setProject(project) + .build(); + return listSubscriptions(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching subscriptions. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Iterable listSubscriptions(ListSubscriptionsRequest request) { + return listSubscriptionsStreamingCallable() + .iterableResponseStreamCall(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching subscriptions. + */ + public ApiCallable listSubscriptionsStreamingCallable() { + return listSubscriptionsCallable().pageStreaming(LIST_SUBSCRIPTIONS_PAGE_DESC); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching subscriptions. + */ + public ApiCallable listSubscriptionsCallable() { + return ApiUtils.prepareIdempotentCallable(LIST_SUBSCRIPTIONS, settings).bind(channel); + } + + // ----- deleteSubscription ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes an existing subscription. All pending messages in the subscription + * are immediately dropped. Calls to Pull after deletion will return + * NOT_FOUND. After a subscription is deleted, a new one may be created with + * the same name, but the new one has no association with the old + * subscription, or its topic unless the same topic is specified. + * + * @param subscription The subscription to delete. + */ + public void deleteSubscription(String subscription) { + DeleteSubscriptionRequest request = + DeleteSubscriptionRequest.newBuilder() + .setSubscription(subscription) + .build(); + + deleteSubscription(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes an existing subscription. All pending messages in the subscription + * are immediately dropped. Calls to Pull after deletion will return + * NOT_FOUND. After a subscription is deleted, a new one may be created with + * the same name, but the new one has no association with the old + * subscription, or its topic unless the same topic is specified. + * + * @param request The request object containing all of the parameters for the API call. + */ + public void deleteSubscription(DeleteSubscriptionRequest request) { + deleteSubscriptionCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes an existing subscription. All pending messages in the subscription + * are immediately dropped. Calls to Pull after deletion will return + * NOT_FOUND. After a subscription is deleted, a new one may be created with + * the same name, but the new one has no association with the old + * subscription, or its topic unless the same topic is specified. + */ + public ApiCallable deleteSubscriptionCallable() { + return ApiUtils.prepareIdempotentCallable(DELETE_SUBSCRIPTION, settings).bind(channel); + } + + // ----- modifyAckDeadline ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the ack deadline for a specific message. This method is useful to + * indicate that more time is needed to process a message by the subscriber, + * or to make the message available for redelivery if the processing was + * interrupted. + * + * @param subscription The name of the subscription. + * @param ackIds List of acknowledgment IDs. + * @param ackDeadlineSeconds The new ack deadline with respect to the time this request was sent to the + * Pub/Sub system. Must be >= 0. For example, if the value is 10, the new ack + * deadline will expire 10 seconds after the ModifyAckDeadline call was made. + * Specifying zero may immediately make the message available for another pull + * request. + */ + public void modifyAckDeadline(String subscription, List ackIds, int ackDeadlineSeconds) { + ModifyAckDeadlineRequest request = + ModifyAckDeadlineRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(ackIds) + .setAckDeadlineSeconds(ackDeadlineSeconds) + .build(); + + modifyAckDeadline(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the ack deadline for a specific message. This method is useful to + * indicate that more time is needed to process a message by the subscriber, + * or to make the message available for redelivery if the processing was + * interrupted. + * + * @param request The request object containing all of the parameters for the API call. + */ + public void modifyAckDeadline(ModifyAckDeadlineRequest request) { + modifyAckDeadlineCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the ack deadline for a specific message. This method is useful to + * indicate that more time is needed to process a message by the subscriber, + * or to make the message available for redelivery if the processing was + * interrupted. + */ + public ApiCallable modifyAckDeadlineCallable() { + return MODIFY_ACK_DEADLINE.bind(channel); + } + + // ----- acknowledge ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Acknowledges the messages associated with the ack tokens in the + * AcknowledgeRequest. The Pub/Sub system can remove the relevant messages + * from the subscription. + * + * Acknowledging a message whose ack deadline has expired may succeed, + * but such a message may be redelivered later. Acknowledging a message more + * than once will not result in an error. + * + * @param subscription The subscription whose message is being acknowledged. + * @param ackIds The acknowledgment ID for the messages being acknowledged that was returned + * by the Pub/Sub system in the Pull response. Must not be empty. + */ + public void acknowledge(String subscription, List ackIds) { + AcknowledgeRequest request = + AcknowledgeRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(ackIds) + .build(); + + acknowledge(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Acknowledges the messages associated with the ack tokens in the + * AcknowledgeRequest. The Pub/Sub system can remove the relevant messages + * from the subscription. + * + * Acknowledging a message whose ack deadline has expired may succeed, + * but such a message may be redelivered later. Acknowledging a message more + * than once will not result in an error. + * + * @param request The request object containing all of the parameters for the API call. + */ + public void acknowledge(AcknowledgeRequest request) { + acknowledgeCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Acknowledges the messages associated with the ack tokens in the + * AcknowledgeRequest. The Pub/Sub system can remove the relevant messages + * from the subscription. + * + * Acknowledging a message whose ack deadline has expired may succeed, + * but such a message may be redelivered later. Acknowledging a message more + * than once will not result in an error. + */ + public ApiCallable acknowledgeCallable() { + return ACKNOWLEDGE.bind(channel); + } + + // ----- pull ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Pulls messages from the server. Returns an empty list if there are no + * messages available in the backlog. The server may return UNAVAILABLE if + * there are too many concurrent pull requests pending for the given + * subscription. + * + * @param subscription The subscription from which messages should be pulled. + * @param returnImmediately If this is specified as true the system will respond immediately even if + * it is not able to return a message in the Pull response. Otherwise the + * system is allowed to wait until at least one message is available rather + * than returning no messages. The client may cancel the request if it does + * not wish to wait any longer for the response. + * @param maxMessages The maximum number of messages returned for this request. The Pub/Sub + * system may return fewer than the number specified. + */ + public PullResponse pull(String subscription, boolean returnImmediately, int maxMessages) { + PullRequest request = + PullRequest.newBuilder() + .setSubscription(subscription) + .setReturnImmediately(returnImmediately) + .setMaxMessages(maxMessages) + .build(); + + return pull(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Pulls messages from the server. Returns an empty list if there are no + * messages available in the backlog. The server may return UNAVAILABLE if + * there are too many concurrent pull requests pending for the given + * subscription. + * + * @param request The request object containing all of the parameters for the API call. + */ + public PullResponse pull(PullRequest request) { + return pullCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Pulls messages from the server. Returns an empty list if there are no + * messages available in the backlog. The server may return UNAVAILABLE if + * there are too many concurrent pull requests pending for the given + * subscription. + */ + public ApiCallable pullCallable() { + return PULL.bind(channel); + } + + // ----- modifyPushConfig ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the PushConfig for a specified subscription. + * + * This may be used to change a push subscription to a pull one (signified + * by an empty PushConfig) or vice versa, or change the endpoint URL and other + * attributes of a push subscription. Messages will accumulate for + * delivery continuously through the call regardless of changes to the + * PushConfig. + * + * @param subscription The name of the subscription. + * @param pushConfig The push configuration for future deliveries. + * + * An empty pushConfig indicates that the Pub/Sub system should + * stop pushing messages from the given subscription and allow + * messages to be pulled and acknowledged - effectively pausing + * the subscription if Pull is not called. + */ + public void modifyPushConfig(String subscription, PushConfig pushConfig) { + ModifyPushConfigRequest request = + ModifyPushConfigRequest.newBuilder() + .setSubscription(subscription) + .setPushConfig(pushConfig) + .build(); + + modifyPushConfig(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the PushConfig for a specified subscription. + * + * This may be used to change a push subscription to a pull one (signified + * by an empty PushConfig) or vice versa, or change the endpoint URL and other + * attributes of a push subscription. Messages will accumulate for + * delivery continuously through the call regardless of changes to the + * PushConfig. + * + * @param request The request object containing all of the parameters for the API call. + */ + public void modifyPushConfig(ModifyPushConfigRequest request) { + modifyPushConfigCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the PushConfig for a specified subscription. + * + * This may be used to change a push subscription to a pull one (signified + * by an empty PushConfig) or vice versa, or change the endpoint URL and other + * attributes of a push subscription. Messages will accumulate for + * delivery continuously through the call regardless of changes to the + * PushConfig. + */ + public ApiCallable modifyPushConfigCallable() { + return MODIFY_PUSH_CONFIG.bind(channel); + } + + + // ======== + // Cleanup + // ======== + + /** + * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately + * cancelled. + */ + @Override public void close() { + // Manually-added shutdown code + + // Auto-generated shutdown code + channel.shutdown(); + + // Manually-added shutdown code + } + + + // ======== + // Manually-added methods: add custom (non-generated) methods after this point. + // ======== + +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/LocalPublisherImpl.java b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/LocalPublisherImpl.java new file mode 100644 index 000000000000..c6cd637dd980 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/LocalPublisherImpl.java @@ -0,0 +1,138 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.gcloud.pubsub.spi; + +import com.google.protobuf.Empty; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc.Publisher; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; + +import io.grpc.stub.StreamObserver; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class LocalPublisherImpl implements Publisher { + + private Map> topics = new HashMap<>(); + + @Override + public void createTopic(Topic request, StreamObserver responseObserver) { + topics.put(request.getName(), new ArrayList()); + + Topic response = Topic.newBuilder().setName(request.getName()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void publish(PublishRequest request, StreamObserver responseObserver) { + List topicMessages = topics.get(request.getTopic()); + List ids = new ArrayList<>(); + int index = 0; + for (PubsubMessage msg : request.getMessagesList()) { + topicMessages.add(msg); + ids.add(new Integer(index).toString()); + } + + responseObserver.onNext(PublishResponse.newBuilder().addAllMessageIds(ids).build()); + responseObserver.onCompleted(); + } + + @Override + public void getTopic(GetTopicRequest request, StreamObserver responseObserver) { + if (topics.get(request.getTopic()) == null) { + throw new IllegalArgumentException("topic doesn't exist: " + request.getTopic()); + } + + Topic response = Topic.newBuilder().setName(request.getTopic()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void listTopics(ListTopicsRequest request, StreamObserver responseObserver) { + List responseTopics = new ArrayList<>(); + for (String topicName : topics.keySet()) { + String projectOfTopic = PublisherApi.TOPIC_PATH_TEMPLATE.parse(topicName).get("project"); + String projectOfRequest = PublisherApi.PROJECT_PATH_TEMPLATE.parse(request.getProject()).get("project"); + if (projectOfTopic.equals(projectOfRequest)) { + Topic topicObj = Topic.newBuilder().setName(topicName).build(); + responseTopics.add(topicObj); + } + } + Collections.sort(responseTopics, new Comparator() { + @Override public int compare(Topic o1, Topic o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + ListTopicsResponse.Builder response = ListTopicsResponse.newBuilder(); + response.setNextPageToken(""); + response.addAllTopics(responseTopics); + responseObserver.onNext(response.build()); + responseObserver.onCompleted(); + } + + @Override + public void listTopicSubscriptions(ListTopicSubscriptionsRequest request, + StreamObserver responseObserver) { + responseObserver.onNext(ListTopicSubscriptionsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void deleteTopic(DeleteTopicRequest request, StreamObserver responseObserver) { + topics.remove(request.getTopic()); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + + public Map> getTopics() { + return topics; + } + + public void reset() { + topics = new HashMap<>(); + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java new file mode 100644 index 000000000000..41dc3bb7c160 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java @@ -0,0 +1,175 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.gcloud.pubsub.spi; + +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; + +import io.gapi.gax.grpc.ServiceApiSettings; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class PublisherApiTest { + private static Server server; + private static LocalPublisherImpl publisherImpl; + private ManagedChannel channel; + private PublisherApi publisherApi; + + @BeforeClass + public static void startStaticServer() { + publisherImpl = new LocalPublisherImpl(); + NettyServerBuilder builder = NettyServerBuilder + .forAddress(new LocalAddress("in-process-1")) + .flowControlWindow(65 * 1024) + .channelType(LocalServerChannel.class); + builder.addService(PublisherGrpc.bindService(publisherImpl)); + try { + server = builder.build().start(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @AfterClass + public static void stopServer() { + server.shutdownNow(); + } + + @Before + public void setUp() throws Exception { + publisherImpl.reset(); + channel = NettyChannelBuilder + .forAddress(new LocalAddress("in-process-1")) + .negotiationType(NegotiationType.PLAINTEXT) + .channelType(LocalChannel.class) + .build(); + + ServiceApiSettings settings = new ServiceApiSettings(); + settings.setChannel(channel); + publisherApi = PublisherApi.create(settings); + } + + @After + public void tearDown() throws Exception { + if (channel != null) { + channel.shutdown(); + } + if (publisherApi != null) { + publisherApi.close(); + } + publisherImpl.reset(); + } + + @Test + public void testCreateTopic() throws Exception { + String topicName = PublisherApi.createTopic("my-project", "my-topic"); + + Topic result = publisherApi.createTopic(topicName); + Assert.assertEquals(topicName, result.getName()); + + Assert.assertEquals(1, publisherImpl.getTopics().size()); + Assert.assertNotNull(publisherImpl.getTopics().get(topicName)); + } + + @Test + public void testPublish() throws Exception { + String topicName = PublisherApi.createTopic("my-project", "publish-topic"); + + publisherApi.createTopic(topicName); + PubsubMessage msg = PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("pubsub-message")) + .build(); + publisherApi.publish(topicName, Collections.singletonList(msg)); + List publishedMessages = publisherImpl.getTopics().get(topicName); + Assert.assertEquals(1, publishedMessages.size()); + Assert.assertEquals("pubsub-message", publishedMessages.get(0).getData().toStringUtf8()); + } + + @Test + public void testGetTopic() throws Exception { + String topicName = PublisherApi.createTopic("my-project", "fun-topic"); + + publisherApi.createTopic(topicName); + Topic result = publisherApi.getTopic(topicName); + Assert.assertNotNull(result); + Assert.assertEquals(topicName, result.getName()); + } + + @Test + public void testListTopics() throws Exception { + String project1 = PublisherApi.createProject("project.1"); + String topicName1 = PublisherApi.createTopic("project.1", "topic.1"); + String topicName2 = PublisherApi.createTopic("project.1", "topic.2"); + String topicName3 = PublisherApi.createTopic("project.2", "topic.3"); + + publisherApi.createTopic(topicName1); + publisherApi.createTopic(topicName2); + publisherApi.createTopic(topicName3); + + List topics = new ArrayList<>(); + for (Topic topic : publisherApi.listTopics(project1)) { + topics.add(topic); + } + Assert.assertEquals(2, topics.size()); + Assert.assertEquals(topicName1, topics.get(0).getName()); + Assert.assertEquals(topicName2, topics.get(1).getName()); + } + + @Test + public void testDeleteTopic() throws Exception { + String topicName = PublisherApi.createTopic("my-project", "fun-topic"); + + publisherApi.createTopic(topicName); + publisherApi.deleteTopic(topicName); + Assert.assertEquals(0, publisherImpl.getTopics().size()); + } +} From c952c20bd0431c5800383ac75bc8afda24525c5e Mon Sep 17 00:00:00 2001 From: Garrett Jones Date: Wed, 11 Nov 2015 11:33:07 -0800 Subject: [PATCH 2/5] Fixing source paths and module configuration --- gcloud-java-gax/pom.xml | 2 +- gcloud-java-pubsub/pom.xml | 2 +- pom.xml | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/gcloud-java-gax/pom.xml b/gcloud-java-gax/pom.xml index bbfd8c6007e5..5710df3d5bfe 100644 --- a/gcloud-java-gax/pom.xml +++ b/gcloud-java-gax/pom.xml @@ -63,7 +63,7 @@ add-source - generated/src/main + generated/src/main/java diff --git a/gcloud-java-pubsub/pom.xml b/gcloud-java-pubsub/pom.xml index df9debe4a030..998adeee4827 100644 --- a/gcloud-java-pubsub/pom.xml +++ b/gcloud-java-pubsub/pom.xml @@ -51,7 +51,7 @@ add-source - generated/src/main + generated/src/main/java diff --git a/pom.xml b/pom.xml index e35a620247d1..44bdccc273a4 100644 --- a/pom.xml +++ b/pom.xml @@ -67,8 +67,10 @@ gcloud-java-core + gcloud-java-gax gcloud-java-datastore gcloud-java-storage + gcloud-java-pubsub gcloud-java gcloud-java-examples From 4fbfb7bfd0cc30b34b1e0a6d832aee58bb7de6f5 Mon Sep 17 00:00:00 2001 From: Garrett Jones Date: Tue, 17 Nov 2015 15:40:58 -0800 Subject: [PATCH 3/5] Updates to address PR comments. * license change * update to notes on code updating * renaming helper functions for creating fully-qualified paths * making certain static final members private * adding some documentation * minor reformating * Creating LocalPubsubHelper, using it in PublisherApiTest * Making the constructor of PublisherApi protected so that subclasses can be created --- .../io/gapi/gax/grpc/ServiceApiSettings.java | 6 +- .../gcloud/pubsub/spi/PublisherApi.java | 102 ++++++++------- .../gcloud/pubsub/spi/SubscriberApi.java | 96 +++++++------- .../spi/testing/LocalPublisherImpl.java | 120 ++++++++++++++++++ .../pubsub/testing/LocalPubsubHelper.java | 83 ++++++++++++ .../gcloud/pubsub/spi/PublisherApiTest.java | 109 ++++------------ 6 files changed, 339 insertions(+), 177 deletions(-) create mode 100644 gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java create mode 100644 gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java index 485d4794d917..13da3abe911b 100644 --- a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java +++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java @@ -105,8 +105,10 @@ public int getPort() { } /** - * An instance of ManagedChannel; shutdown will be called on this channel when - * the instance of LoggingServiceApi is shut down. + * The channel used to send requests to the service. Whichever service api class that + * this instance of ServiceApiSettings is passed to will call shutdown() on this + * channel. This injection mechanism is intended for use by unit tests to override + * the channel that would be created by default for real calls to the service. */ public ServiceApiSettings setChannel(ManagedChannel channel) { this.channel = channel; diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java index 3c239c3923e4..08d4181c8fd3 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java @@ -1,45 +1,31 @@ /* - * Copyright 2015, Google Inc. All rights reserved. + * Copyright 2015 Google Inc. All Rights Reserved. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. + * http://www.apache.org/licenses/LICENSE-2.0 * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ /* * EDITING INSTRUCTIONS - * This file was generated from the file google/pubsub/v1/pubsub.proto, - * and updates to that file get reflected here through a regular refresh process. - * However, manual additions are allowed because the refresh process performs + * This file was generated from the file + * https://github.com/google/googleapis/blob/master/google/pubsub/v1/pubsub.proto + * and updates to that file get reflected here through a refresh process. + * For the short term, the refresh process will only be runnable by Google engineers. + * Manual additions are allowed because the refresh process performs * a 3-way merge in order to preserve those manual additions. In order to not * break the refresh process, only certain types of modifications are * allowed. * - * Allowed modifications - currently there is only one type allowed: + * Allowed modifications - currently these are the only types allowed: * 1. New methods (these should be added to the end of the class) + * 2. New imports * * Happy editing! */ @@ -96,28 +82,27 @@ public class PublisherApi implements AutoCloseable { public static final int DEFAULT_SERVICE_PORT = 443; - public static final ApiCallable + private static final ApiCallable CREATE_TOPIC = ApiCallable.create(PublisherGrpc.METHOD_CREATE_TOPIC); - public static final ApiCallable + private static final ApiCallable PUBLISH = ApiCallable.create(PublisherGrpc.METHOD_PUBLISH); - public static final ApiCallable + private static final ApiCallable GET_TOPIC = ApiCallable.create(PublisherGrpc.METHOD_GET_TOPIC); - public static final ApiCallable + private static final ApiCallable LIST_TOPICS = ApiCallable.create(PublisherGrpc.METHOD_LIST_TOPICS); - public static final ApiCallable + private static final ApiCallable LIST_TOPIC_SUBSCRIPTIONS = ApiCallable.create(PublisherGrpc.METHOD_LIST_TOPIC_SUBSCRIPTIONS); - public static final ApiCallable + private static final ApiCallable DELETE_TOPIC = ApiCallable.create(PublisherGrpc.METHOD_DELETE_TOPIC); - - - private static PageDescriptor LIST_TOPICS_PAGE_DESC = new PageDescriptor() { - @Override public Object emptyToken() { + @Override + public Object emptyToken() { return ""; } - @Override public ListTopicsRequest injectToken( + @Override + public ListTopicsRequest injectToken( ListTopicsRequest payload, Object token) { return ListTopicsRequest .newBuilder(payload) @@ -133,12 +118,15 @@ public Iterable extractResources(ListTopicsResponse payload) { return payload.getTopicsList(); } }; + private static PageDescriptor LIST_TOPIC_SUBSCRIPTIONS_PAGE_DESC = new PageDescriptor() { - @Override public Object emptyToken() { + @Override + public Object emptyToken() { return ""; } - @Override public ListTopicSubscriptionsRequest injectToken( + @Override + public ListTopicSubscriptionsRequest injectToken( ListTopicSubscriptionsRequest payload, Object token) { return ListTopicSubscriptionsRequest .newBuilder(payload) @@ -159,8 +147,17 @@ public Iterable extractResources(ListTopicSubscriptionsResponse payload) "https://www.googleapis.com/auth/pubsub" }; + /** + * A PathTemplate representing the fully-qualified path to represent + * a project resource. + */ public static final PathTemplate PROJECT_PATH_TEMPLATE = PathTemplate.create("/projects/{project}"); + + /** + * A PathTemplate representing the fully-qualified path to represent + * a topic resource. + */ public static final PathTemplate TOPIC_PATH_TEMPLATE = PathTemplate.create("/projects/{project}/topics/{topic}"); @@ -190,7 +187,11 @@ public static PublisherApi create(ServiceApiSettings settings) throws IOExceptio return new PublisherApi(settings); } - private PublisherApi(ServiceApiSettings settings) throws IOException { + /** + * Constructs an instance of PublisherApi, using the given settings. This is protected so that it + * easy to make a subclass, but otherwise, the static factory methods should be preferred. + */ + protected PublisherApi(ServiceApiSettings settings) throws IOException { ServiceApiSettings internalSettings = ApiUtils.settingsWithChannels(settings, SERVICE_ADDRESS, DEFAULT_SERVICE_PORT, ALL_SCOPES); this.settings = internalSettings; @@ -201,12 +202,20 @@ private PublisherApi(ServiceApiSettings settings) throws IOException { // Resource Name Helper Functions // ============================== - public static final String createProject(String project) { + /** + * Creates a string containing the fully-qualified path to represent + * a project resource. + */ + public static final String createProjectPath(String project) { return PROJECT_PATH_TEMPLATE.instantiate( "project", project); } - public static final String createTopic(String project, String topic) { + /** + * Creates a string containing the fully-qualified path to represent + * a topic resource. + */ + public static final String createTopicPath(String project, String topic) { return TOPIC_PATH_TEMPLATE.instantiate( "project", project,"topic", topic); } @@ -481,7 +490,8 @@ public ApiCallable deleteTopicCallable() { * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately * cancelled. */ - @Override public void close() { + @Override + public void close() { // Manually-added shutdown code // Auto-generated shutdown code diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java index 6afe0da72d51..6ec9195d25a2 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java @@ -1,45 +1,31 @@ /* - * Copyright 2015, Google Inc. All rights reserved. + * Copyright 2015 Google Inc. All Rights Reserved. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. + * http://www.apache.org/licenses/LICENSE-2.0 * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ /* * EDITING INSTRUCTIONS - * This file was generated from the file google/pubsub/v1/pubsub.proto, - * and updates to that file get reflected here through a regular refresh process. - * However, manual additions are allowed because the refresh process performs + * This file was generated from the file + * https://github.com/google/googleapis/blob/master/google/pubsub/v1/pubsub.proto + * and updates to that file get reflected here through a refresh process. + * For the short term, the refresh process will only be runnable by Google engineers. + * Manual additions are allowed because the refresh process performs * a 3-way merge in order to preserve those manual additions. In order to not * break the refresh process, only certain types of modifications are * allowed. * - * Allowed modifications - currently there is only one type allowed: + * Allowed modifications - currently these are the only types allowed: * 1. New methods (these should be added to the end of the class) + * 2. New imports * * Happy editing! */ @@ -97,31 +83,31 @@ public class SubscriberApi implements AutoCloseable { public static final int DEFAULT_SERVICE_PORT = 443; - public static final ApiCallable + private static final ApiCallable CREATE_SUBSCRIPTION = ApiCallable.create(SubscriberGrpc.METHOD_CREATE_SUBSCRIPTION); - public static final ApiCallable + private static final ApiCallable GET_SUBSCRIPTION = ApiCallable.create(SubscriberGrpc.METHOD_GET_SUBSCRIPTION); - public static final ApiCallable + private static final ApiCallable LIST_SUBSCRIPTIONS = ApiCallable.create(SubscriberGrpc.METHOD_LIST_SUBSCRIPTIONS); - public static final ApiCallable + private static final ApiCallable DELETE_SUBSCRIPTION = ApiCallable.create(SubscriberGrpc.METHOD_DELETE_SUBSCRIPTION); - public static final ApiCallable + private static final ApiCallable MODIFY_ACK_DEADLINE = ApiCallable.create(SubscriberGrpc.METHOD_MODIFY_ACK_DEADLINE); - public static final ApiCallable + private static final ApiCallable ACKNOWLEDGE = ApiCallable.create(SubscriberGrpc.METHOD_ACKNOWLEDGE); - public static final ApiCallable + private static final ApiCallable PULL = ApiCallable.create(SubscriberGrpc.METHOD_PULL); - public static final ApiCallable + private static final ApiCallable MODIFY_PUSH_CONFIG = ApiCallable.create(SubscriberGrpc.METHOD_MODIFY_PUSH_CONFIG); - - private static PageDescriptor LIST_SUBSCRIPTIONS_PAGE_DESC = new PageDescriptor() { - @Override public Object emptyToken() { + @Override + public Object emptyToken() { return ""; } - @Override public ListSubscriptionsRequest injectToken( + @Override + public ListSubscriptionsRequest injectToken( ListSubscriptionsRequest payload, Object token) { return ListSubscriptionsRequest .newBuilder(payload) @@ -138,16 +124,21 @@ public Iterable extractResources(ListSubscriptionsResponse payload } }; - - - - private static String ALL_SCOPES[] = { "https://www.googleapis.com/auth/pubsub" }; + /** + * A PathTemplate representing the fully-qualified path to represent + * a project resource. + */ public static final PathTemplate PROJECT_PATH_TEMPLATE = PathTemplate.create("/projects/{project}"); + + /** + * A PathTemplate representing the fully-qualified path to represent + * a subscription resource. + */ public static final PathTemplate SUBSCRIPTION_PATH_TEMPLATE = PathTemplate.create("/projects/{project}/subscriptions/{subscription}"); @@ -188,12 +179,20 @@ private SubscriberApi(ServiceApiSettings settings) throws IOException { // Resource Name Helper Functions // ============================== - public static final String createProject(String project) { + /** + * Creates a string containing the fully-qualified path to represent + * a project resource. + */ + public static final String createProjectPath(String project) { return PROJECT_PATH_TEMPLATE.instantiate( "project", project); } - public static final String createSubscription(String project, String subscription) { + /** + * Creates a string containing the fully-qualified path to represent + * a subscription resource. + */ + public static final String createSubscriptionPath(String project, String subscription) { return SUBSCRIPTION_PATH_TEMPLATE.instantiate( "project", project,"subscription", subscription); } @@ -646,7 +645,8 @@ public ApiCallable modifyPushConfigCallable() { * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately * cancelled. */ - @Override public void close() { + @Override + public void close() { // Manually-added shutdown code // Auto-generated shutdown code diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java new file mode 100644 index 000000000000..b9c5f9d513bd --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java @@ -0,0 +1,120 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.gcloud.pubsub.spi.testing; + +import com.google.gcloud.pubsub.spi.PublisherApi; +import com.google.protobuf.Empty; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc.Publisher; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; + +import io.grpc.stub.StreamObserver; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class LocalPublisherImpl implements Publisher { + + private Map> topics = new HashMap<>(); + + @Override + public void createTopic(Topic request, StreamObserver responseObserver) { + topics.put(request.getName(), new ArrayList()); + + Topic response = Topic.newBuilder().setName(request.getName()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void publish(PublishRequest request, StreamObserver responseObserver) { + List topicMessages = topics.get(request.getTopic()); + List ids = new ArrayList<>(); + int index = 0; + for (PubsubMessage msg : request.getMessagesList()) { + topicMessages.add(msg); + ids.add(new Integer(index).toString()); + } + responseObserver.onNext(PublishResponse.newBuilder().addAllMessageIds(ids).build()); + responseObserver.onCompleted(); + } + + @Override + public void getTopic(GetTopicRequest request, StreamObserver responseObserver) { + if (topics.get(request.getTopic()) == null) { + throw new IllegalArgumentException("topic doesn't exist: " + request.getTopic()); + } + Topic response = Topic.newBuilder().setName(request.getTopic()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void listTopics(ListTopicsRequest request, StreamObserver responseObserver) { + List responseTopics = new ArrayList<>(); + for (String topicName : topics.keySet()) { + String projectOfTopic = PublisherApi.TOPIC_PATH_TEMPLATE.parse(topicName).get("project"); + String projectOfRequest = PublisherApi.PROJECT_PATH_TEMPLATE.parse(request.getProject()).get("project"); + if (projectOfTopic.equals(projectOfRequest)) { + Topic topicObj = Topic.newBuilder().setName(topicName).build(); + responseTopics.add(topicObj); + } + } + Collections.sort(responseTopics, new Comparator() { + @Override public int compare(Topic o1, Topic o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + ListTopicsResponse.Builder response = ListTopicsResponse.newBuilder(); + response.setNextPageToken(""); + response.addAllTopics(responseTopics); + responseObserver.onNext(response.build()); + responseObserver.onCompleted(); + } + + @Override + public void listTopicSubscriptions(ListTopicSubscriptionsRequest request, + StreamObserver responseObserver) { + responseObserver.onNext(ListTopicSubscriptionsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void deleteTopic(DeleteTopicRequest request, StreamObserver responseObserver) { + topics.remove(request.getTopic()); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + + public Map> getTopics() { + return topics; + } + + public void reset() { + topics = new HashMap<>(); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java new file mode 100644 index 000000000000..13fbf5208047 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java @@ -0,0 +1,83 @@ +package com.google.gcloud.pubsub.testing; + +import com.google.gcloud.pubsub.spi.testing.LocalPublisherImpl; +import com.google.pubsub.v1.PublisherGrpc; + +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; + +import java.io.IOException; +import java.net.SocketAddress; + +/** + * A class that runs an in-memory Publisher instance for use in tests. + */ +public class LocalPubsubHelper { + private static final SocketAddress address = new LocalAddress("in-process-1"); + private static Server server; + private static LocalPublisherImpl publisherImpl; + + /** + * Constructs a new LocalPubsubHelper. The method start() must + * be called before it is used. + */ + public LocalPubsubHelper() { + publisherImpl = new LocalPublisherImpl(); + NettyServerBuilder builder = NettyServerBuilder + .forAddress(address) + .flowControlWindow(65 * 1024) + .channelType(LocalServerChannel.class); + builder.addService(PublisherGrpc.bindService(publisherImpl)); + server = builder.build(); + } + + /** + * Starts the in-memory service. + */ + public LocalPubsubHelper start() { + try { + server.start(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + return this; + } + + /** + * Resets the state of the in-memory service. + */ + public void reset() { + publisherImpl.reset(); + } + + /** + * Returns the internal in-memory service. + */ + public LocalPublisherImpl getPublisherImpl() { + return publisherImpl; + } + + /** + * Creates a channel for making requests to the in-memory service. + */ + public ManagedChannel createChannel() { + return NettyChannelBuilder + .forAddress(address) + .negotiationType(NegotiationType.PLAINTEXT) + .channelType(LocalChannel.class) + .build(); + } + + /** + * Shuts down the in-memory service. + */ + public void shutdownNow() { + server.shutdownNow(); + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java index 41dc3bb7c160..96bdd9106497 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java @@ -1,52 +1,26 @@ /* - * Copyright 2015, Google Inc. All rights reserved. + * Copyright 2015 Google Inc. All Rights Reserved. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. + * http://www.apache.org/licenses/LICENSE-2.0 * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ package com.google.gcloud.pubsub.spi; +import com.google.gcloud.pubsub.testing.LocalPubsubHelper; import com.google.protobuf.ByteString; -import com.google.pubsub.v1.PublisherGrpc; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.Topic; import io.gapi.gax.grpc.ServiceApiSettings; -import io.grpc.ManagedChannel; -import io.grpc.Server; -import io.grpc.netty.NegotiationType; -import io.grpc.netty.NettyChannelBuilder; -import io.grpc.netty.NettyServerBuilder; -import io.netty.channel.local.LocalAddress; -import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalServerChannel; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -59,85 +33,61 @@ import org.junit.Test; public class PublisherApiTest { - private static Server server; - private static LocalPublisherImpl publisherImpl; - private ManagedChannel channel; + private static LocalPubsubHelper pubsubHelper; private PublisherApi publisherApi; @BeforeClass public static void startStaticServer() { - publisherImpl = new LocalPublisherImpl(); - NettyServerBuilder builder = NettyServerBuilder - .forAddress(new LocalAddress("in-process-1")) - .flowControlWindow(65 * 1024) - .channelType(LocalServerChannel.class); - builder.addService(PublisherGrpc.bindService(publisherImpl)); - try { - server = builder.build().start(); - } catch (IOException ex) { - throw new RuntimeException(ex); - } + pubsubHelper = new LocalPubsubHelper().start(); } @AfterClass public static void stopServer() { - server.shutdownNow(); + pubsubHelper.shutdownNow(); } @Before public void setUp() throws Exception { - publisherImpl.reset(); - channel = NettyChannelBuilder - .forAddress(new LocalAddress("in-process-1")) - .negotiationType(NegotiationType.PLAINTEXT) - .channelType(LocalChannel.class) - .build(); - + pubsubHelper.reset(); ServiceApiSettings settings = new ServiceApiSettings(); - settings.setChannel(channel); + settings.setChannel(pubsubHelper.createChannel()); publisherApi = PublisherApi.create(settings); } @After public void tearDown() throws Exception { - if (channel != null) { - channel.shutdown(); - } if (publisherApi != null) { publisherApi.close(); } - publisherImpl.reset(); + pubsubHelper.reset(); } @Test public void testCreateTopic() throws Exception { - String topicName = PublisherApi.createTopic("my-project", "my-topic"); - + String topicName = PublisherApi.createTopicPath("my-project", "my-topic"); Topic result = publisherApi.createTopic(topicName); Assert.assertEquals(topicName, result.getName()); - - Assert.assertEquals(1, publisherImpl.getTopics().size()); - Assert.assertNotNull(publisherImpl.getTopics().get(topicName)); + Assert.assertEquals(1, pubsubHelper.getPublisherImpl().getTopics().size()); + Assert.assertNotNull(pubsubHelper.getPublisherImpl().getTopics().get(topicName)); } @Test public void testPublish() throws Exception { - String topicName = PublisherApi.createTopic("my-project", "publish-topic"); - + String topicName = PublisherApi.createTopicPath("my-project", "publish-topic"); publisherApi.createTopic(topicName); PubsubMessage msg = PubsubMessage.newBuilder() .setData(ByteString.copyFromUtf8("pubsub-message")) .build(); publisherApi.publish(topicName, Collections.singletonList(msg)); - List publishedMessages = publisherImpl.getTopics().get(topicName); + List publishedMessages = + pubsubHelper.getPublisherImpl().getTopics().get(topicName); Assert.assertEquals(1, publishedMessages.size()); Assert.assertEquals("pubsub-message", publishedMessages.get(0).getData().toStringUtf8()); } @Test public void testGetTopic() throws Exception { - String topicName = PublisherApi.createTopic("my-project", "fun-topic"); - + String topicName = PublisherApi.createTopicPath("my-project", "fun-topic"); publisherApi.createTopic(topicName); Topic result = publisherApi.getTopic(topicName); Assert.assertNotNull(result); @@ -146,15 +96,13 @@ public void testGetTopic() throws Exception { @Test public void testListTopics() throws Exception { - String project1 = PublisherApi.createProject("project.1"); - String topicName1 = PublisherApi.createTopic("project.1", "topic.1"); - String topicName2 = PublisherApi.createTopic("project.1", "topic.2"); - String topicName3 = PublisherApi.createTopic("project.2", "topic.3"); - + String project1 = PublisherApi.createProjectPath("project.1"); + String topicName1 = PublisherApi.createTopicPath("project.1", "topic.1"); + String topicName2 = PublisherApi.createTopicPath("project.1", "topic.2"); + String topicName3 = PublisherApi.createTopicPath("project.2", "topic.3"); publisherApi.createTopic(topicName1); publisherApi.createTopic(topicName2); publisherApi.createTopic(topicName3); - List topics = new ArrayList<>(); for (Topic topic : publisherApi.listTopics(project1)) { topics.add(topic); @@ -166,10 +114,9 @@ public void testListTopics() throws Exception { @Test public void testDeleteTopic() throws Exception { - String topicName = PublisherApi.createTopic("my-project", "fun-topic"); - + String topicName = PublisherApi.createTopicPath("my-project", "fun-topic"); publisherApi.createTopic(topicName); publisherApi.deleteTopic(topicName); - Assert.assertEquals(0, publisherImpl.getTopics().size()); + Assert.assertEquals(0, pubsubHelper.getPublisherImpl().getTopics().size()); } } From add0cc33ec52e228f98ca4a0a1d29ffa3fa5f866 Mon Sep 17 00:00:00 2001 From: Garrett Jones Date: Thu, 19 Nov 2015 10:35:52 -0800 Subject: [PATCH 4/5] More updates to address PR comments * Making template constants private * Adding helper methods to extract variables from template paths * Making it possible to have multiple instances of LocalPubsubHelper * Removing getChannel from *Api classes (not used anywhere) * Other cleanup --- .../gcloud/pubsub/spi/PublisherApi.java | 30 +++++++++++----- .../gcloud/pubsub/spi/SubscriberApi.java | 36 ++++++++++++++----- .../spi/testing/LocalPublisherImpl.java | 4 +-- .../pubsub/testing/LocalPubsubHelper.java | 13 ++++--- .../gcloud/pubsub/spi/PublisherApiTest.java | 2 +- 5 files changed, 60 insertions(+), 25 deletions(-) diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java index 08d4181c8fd3..cb78e3f9c728 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java @@ -49,7 +49,6 @@ import io.gapi.gax.grpc.ServiceApiSettings; import io.gapi.gax.internal.ApiUtils; import io.gapi.gax.protobuf.PathTemplate; -import io.grpc.Channel; import io.grpc.ManagedChannel; import java.io.IOException; @@ -151,14 +150,14 @@ public Iterable extractResources(ListTopicSubscriptionsResponse payload) * A PathTemplate representing the fully-qualified path to represent * a project resource. */ - public static final PathTemplate PROJECT_PATH_TEMPLATE = + private static final PathTemplate PROJECT_PATH_TEMPLATE = PathTemplate.create("/projects/{project}"); /** * A PathTemplate representing the fully-qualified path to represent * a topic resource. */ - public static final PathTemplate TOPIC_PATH_TEMPLATE = + private static final PathTemplate TOPIC_PATH_TEMPLATE = PathTemplate.create("/projects/{project}/topics/{topic}"); // ======== @@ -220,13 +219,28 @@ public static final String createTopicPath(String project, String topic) { "project", project,"topic", topic); } + /** + * Extracts the project from the given fully-qualified path which + * represents a project resource. + */ + public static final String extractProjectFromProjectPath(String projectPath) { + return PROJECT_PATH_TEMPLATE.parse(projectPath).get("project"); + } - // ======== - // Getters - // ======== + /** + * Extracts the project from the given fully-qualified path which + * represents a topic resource. + */ + public static final String extractProjectFromTopicPath(String topicPath) { + return TOPIC_PATH_TEMPLATE.parse(topicPath).get("project"); + } - public Channel getChannel() { - return channel; + /** + * Extracts the topic from the given fully-qualified path which + * represents a topic resource. + */ + public static final String extractTopicFromTopicPath(String topicPath) { + return TOPIC_PATH_TEMPLATE.parse(topicPath).get("topic"); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java index 6ec9195d25a2..1c9daaf82384 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java @@ -50,7 +50,6 @@ import io.gapi.gax.grpc.ServiceApiSettings; import io.gapi.gax.internal.ApiUtils; import io.gapi.gax.protobuf.PathTemplate; -import io.grpc.Channel; import io.grpc.ManagedChannel; import java.io.IOException; @@ -132,14 +131,14 @@ public Iterable extractResources(ListSubscriptionsResponse payload * A PathTemplate representing the fully-qualified path to represent * a project resource. */ - public static final PathTemplate PROJECT_PATH_TEMPLATE = + private static final PathTemplate PROJECT_PATH_TEMPLATE = PathTemplate.create("/projects/{project}"); /** * A PathTemplate representing the fully-qualified path to represent * a subscription resource. */ - public static final PathTemplate SUBSCRIPTION_PATH_TEMPLATE = + private static final PathTemplate SUBSCRIPTION_PATH_TEMPLATE = PathTemplate.create("/projects/{project}/subscriptions/{subscription}"); // ======== @@ -168,7 +167,11 @@ public static SubscriberApi create(ServiceApiSettings settings) throws IOExcepti return new SubscriberApi(settings); } - private SubscriberApi(ServiceApiSettings settings) throws IOException { + /** + * Constructs an instance of SubscriberApi, using the given settings. This is protected so that it + * easy to make a subclass, but otherwise, the static factory methods should be preferred. + */ + protected SubscriberApi(ServiceApiSettings settings) throws IOException { ServiceApiSettings internalSettings = ApiUtils.settingsWithChannels(settings, SERVICE_ADDRESS, DEFAULT_SERVICE_PORT, ALL_SCOPES); this.settings = internalSettings; @@ -197,13 +200,28 @@ public static final String createSubscriptionPath(String project, String subscri "project", project,"subscription", subscription); } + /** + * Extracts the project from the given fully-qualified path which + * represents a project resource. + */ + public static final String extractProjectFromProjectPath(String projectPath) { + return PROJECT_PATH_TEMPLATE.parse(projectPath).get("project"); + } - // ======== - // Getters - // ======== + /** + * Extracts the project from the given fully-qualified path which + * represents a subscription resource. + */ + public static final String extractProjectFromSubscriptionPath(String subscriptionPath) { + return SUBSCRIPTION_PATH_TEMPLATE.parse(subscriptionPath).get("project"); + } - public Channel getChannel() { - return channel; + /** + * Extracts the subscription from the given fully-qualified path which + * represents a subscription resource. + */ + public static final String extractSubscriptionFromSubscriptionPath(String subscriptionPath) { + return SUBSCRIPTION_PATH_TEMPLATE.parse(subscriptionPath).get("subscription"); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java index b9c5f9d513bd..6ec1c008f6d0 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java @@ -77,8 +77,8 @@ public void getTopic(GetTopicRequest request, StreamObserver responseObse public void listTopics(ListTopicsRequest request, StreamObserver responseObserver) { List responseTopics = new ArrayList<>(); for (String topicName : topics.keySet()) { - String projectOfTopic = PublisherApi.TOPIC_PATH_TEMPLATE.parse(topicName).get("project"); - String projectOfRequest = PublisherApi.PROJECT_PATH_TEMPLATE.parse(request.getProject()).get("project"); + String projectOfTopic = PublisherApi.extractProjectFromTopicPath(topicName); + String projectOfRequest = PublisherApi.extractProjectFromProjectPath(request.getProject()); if (projectOfTopic.equals(projectOfRequest)) { Topic topicObj = Topic.newBuilder().setName(topicName).build(); responseTopics.add(topicObj); diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java index 13fbf5208047..033330ad6b83 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java @@ -19,19 +19,22 @@ * A class that runs an in-memory Publisher instance for use in tests. */ public class LocalPubsubHelper { - private static final SocketAddress address = new LocalAddress("in-process-1"); - private static Server server; - private static LocalPublisherImpl publisherImpl; + private static int FLOW_CONTROL_WINDOW = 65 * 1024; + + private final SocketAddress address; + private final Server server; + private final LocalPublisherImpl publisherImpl; /** * Constructs a new LocalPubsubHelper. The method start() must * be called before it is used. */ - public LocalPubsubHelper() { + public LocalPubsubHelper(String addressString) { + address = new LocalAddress(addressString); publisherImpl = new LocalPublisherImpl(); NettyServerBuilder builder = NettyServerBuilder .forAddress(address) - .flowControlWindow(65 * 1024) + .flowControlWindow(FLOW_CONTROL_WINDOW) .channelType(LocalServerChannel.class); builder.addService(PublisherGrpc.bindService(publisherImpl)); server = builder.build(); diff --git a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java index 96bdd9106497..19939e0876b6 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java @@ -38,7 +38,7 @@ public class PublisherApiTest { @BeforeClass public static void startStaticServer() { - pubsubHelper = new LocalPubsubHelper().start(); + pubsubHelper = new LocalPubsubHelper("in-process-1").start(); } @AfterClass From b9d8555a9b68f25050827f04087c64a3862f09d1 Mon Sep 17 00:00:00 2001 From: Garrett Jones Date: Thu, 19 Nov 2015 16:18:07 -0800 Subject: [PATCH 5/5] Third round of updates to address PR comments * ApiUtils: Fixing usage of credentials when channel is null * ApiUtils: Function rename for clarity * ApiUtils, ServiceApiSettings: improving comments * ApiCallable: Removing obsolete stuff * not returning 'this' from LocalPubsubHelper.start() --- .../java/io/gapi/gax/grpc/ApiCallable.java | 18 ------ .../io/gapi/gax/grpc/ServiceApiSettings.java | 22 ++++--- .../java/io/gapi/gax/internal/ApiUtils.java | 61 ++++++++++++------- .../gcloud/pubsub/spi/PublisherApi.java | 2 +- .../gcloud/pubsub/spi/SubscriberApi.java | 2 +- .../pubsub/testing/LocalPubsubHelper.java | 3 +- .../gcloud/pubsub/spi/PublisherApiTest.java | 3 +- 7 files changed, 57 insertions(+), 54 deletions(-) diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java index edaa0885d46f..2e563a4413d0 100644 --- a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java +++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java @@ -62,8 +62,6 @@ public abstract class ApiCallable { // TODO(wrwg): Support interceptors and method/call option configurations. - // TODO(wrwg): gather more feedback whether the overload with java.util.Concurrent hurts that - // much that we want to rename this into ClientCallable or such. // Subclass Contract // ================= @@ -390,20 +388,4 @@ public ApiCallable retrying() { return new PageStreamingCallable(this, pageDescriptor); } - /** - * Returns a callable which behaves the same as {@link #pageStreaming(PageDescriptor)}, with - * the page descriptor attempted to derive from the callable descriptor. - * - * @throws IllegalArgumentException if a page descriptor is not derivable. - */ - public ApiCallable - pageStreaming(Class resourceType) { - PageDescriptor pageDescriptor = - getDescriptor() != null ? getDescriptor().getPageDescriptor(resourceType) : null; - if (pageDescriptor == null) { - throw new IllegalArgumentException(String.format( - "cannot derive page descriptor for '%s'", this)); - } - return pageStreaming(pageDescriptor); - } } diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java index 13da3abe911b..eb3ca2b7a9d9 100644 --- a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java +++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java @@ -35,12 +35,15 @@ import io.grpc.ManagedChannel; +/** + * A settings class to configure a service api class. + */ public class ServiceApiSettings { private boolean isIdempotentRetrying; private Credentials credentials; - private String servicePath; + private String serviceAddress; private int port; private ManagedChannel channel; @@ -48,7 +51,7 @@ public class ServiceApiSettings { public ServiceApiSettings() { isIdempotentRetrying = true; credentials = null; - servicePath = null; + serviceAddress = null; port = 0; } @@ -69,7 +72,8 @@ public boolean getIsIdempotentRetrying() { /** * Sets the credentials to use in order to call the service. The default is to acquire - * the credentials using GoogleCredentials.getApplicationDefault(). + * the credentials using GoogleCredentials.getApplicationDefault(). These credentials + * will not be used if the channel is set. */ public ServiceApiSettings setCredentials(Credentials credentials) { this.credentials = credentials; @@ -81,19 +85,19 @@ public Credentials getCredentials() { } /** - * The path used to reach the service. + * The path used to reach the service. This value will not be used if the channel is set. */ - public ServiceApiSettings setServicePath(String servicePath) { - this.servicePath = servicePath; + public ServiceApiSettings setServiceAddress(String serviceAddress) { + this.serviceAddress = serviceAddress; return this; } - public String getServicePath() { - return servicePath; + public String getServiceAddress() { + return serviceAddress; } /** - * The port used to reach the service. + * The port used to reach the service. This value will not be used if the channel is set. */ public ServiceApiSettings setPort(int port) { this.port = port; diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java b/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java index 6673726f761e..3327db0c0211 100644 --- a/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java +++ b/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java @@ -31,12 +31,7 @@ package io.gapi.gax.internal; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Executors; - +import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.collect.Lists; @@ -48,6 +43,11 @@ import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executors; + public class ApiUtils { // TODO(wgg): make this configurable @@ -63,17 +63,26 @@ public static ApiCallable prepareIdem } /** - * Creates a channel for the given path, address and port. + * Acquires application-default credentials, applying the given scopes if the + * credentials require scopes. + */ + public static Credentials credentialsWithScopes(String scopes[]) throws IOException { + List scopeList = Arrays.asList(scopes); + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); + if (credentials.createScopedRequired()) { + credentials = credentials.createScoped(scopeList); + } + return credentials; + } + + /** + * Creates a channel for the given address, port, and credentials. */ - public static ManagedChannel createChannel(String address, int port, Collection scopes) + public static ManagedChannel createChannel(String address, int port, Credentials credentials) throws IOException { List interceptors = Lists.newArrayList(); //TODO: MIGRATION interceptors.add(ChannelFactory.authorityInterceptor(address)); - GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); - if (credentials.createScopedRequired()) { - credentials = credentials.createScoped(scopes); - } interceptors.add(new ClientAuthInterceptor(credentials, Executors.newFixedThreadPool(AUTH_THREADS))); @@ -84,23 +93,31 @@ public static ManagedChannel createChannel(String address, int port, Collection< .build(); } - public static ServiceApiSettings settingsWithChannels(ServiceApiSettings settings, - String defaultServicePath, int defaultServicePort, String scopes[]) throws IOException { + /** + * Creates a new instance of ServiceApiSettings with all fields populated, using + * the given defaults if the corresponding values are not set on ServiceApiSettings. + */ + public static ServiceApiSettings populateSettings(ServiceApiSettings settings, + String defaultServiceAddress, int defaultServicePort, String scopes[]) throws IOException { ManagedChannel channel = settings.getChannel(); if (channel == null) { - String servicePath = defaultServicePath; - if (settings.getServicePath() != null) { - servicePath = settings.getServicePath(); + String servicePath = settings.getServiceAddress(); + if (servicePath == null) { + servicePath = defaultServiceAddress; + } + + int port = settings.getPort(); + if (port == 0) { + port = defaultServicePort; } - int port = defaultServicePort; - if (settings.getPort() != 0) { - port = settings.getPort(); + Credentials credentials = settings.getCredentials(); + if (credentials == null) { + credentials = credentialsWithScopes(scopes); } - List scopeList = Arrays.asList(scopes); - channel = ApiUtils.createChannel(servicePath, port, scopeList); + channel = ApiUtils.createChannel(servicePath, port, credentials); } return new ServiceApiSettings() diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java index cb78e3f9c728..54572afa2e6d 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java @@ -191,7 +191,7 @@ public static PublisherApi create(ServiceApiSettings settings) throws IOExceptio * easy to make a subclass, but otherwise, the static factory methods should be preferred. */ protected PublisherApi(ServiceApiSettings settings) throws IOException { - ServiceApiSettings internalSettings = ApiUtils.settingsWithChannels(settings, + ServiceApiSettings internalSettings = ApiUtils.populateSettings(settings, SERVICE_ADDRESS, DEFAULT_SERVICE_PORT, ALL_SCOPES); this.settings = internalSettings; this.channel = internalSettings.getChannel(); diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java index 1c9daaf82384..ff320597ddf7 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java @@ -172,7 +172,7 @@ public static SubscriberApi create(ServiceApiSettings settings) throws IOExcepti * easy to make a subclass, but otherwise, the static factory methods should be preferred. */ protected SubscriberApi(ServiceApiSettings settings) throws IOException { - ServiceApiSettings internalSettings = ApiUtils.settingsWithChannels(settings, + ServiceApiSettings internalSettings = ApiUtils.populateSettings(settings, SERVICE_ADDRESS, DEFAULT_SERVICE_PORT, ALL_SCOPES); this.settings = internalSettings; this.channel = internalSettings.getChannel(); diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java index 033330ad6b83..b9c17e0f0831 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java @@ -43,13 +43,12 @@ public LocalPubsubHelper(String addressString) { /** * Starts the in-memory service. */ - public LocalPubsubHelper start() { + public void start() { try { server.start(); } catch (IOException ex) { throw new RuntimeException(ex); } - return this; } /** diff --git a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java index 19939e0876b6..38e337890aa1 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java @@ -38,7 +38,8 @@ public class PublisherApiTest { @BeforeClass public static void startStaticServer() { - pubsubHelper = new LocalPubsubHelper("in-process-1").start(); + pubsubHelper = new LocalPubsubHelper("in-process-1"); + pubsubHelper.start(); } @AfterClass