Skip to content

Commit

Permalink
return Name objects instead of plain String (#1562)
Browse files Browse the repository at this point in the history
* return Name objects instead of plain String
  • Loading branch information
pongad authored Jan 25, 2017
1 parent 10bedac commit 16957b0
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.SubscriptionName;
Expand Down Expand Up @@ -52,7 +51,7 @@ public ListenableFuture<MessageReceiver.AckReply> receiveMessage(PubsubMessage m
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.Builder.newBuilder(subscription, receiver).build();
subscriber = Subscriber.newBuilder(subscription, receiver).build();
subscriber.addListener(
new Subscriber.SubscriberListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static void main(String... args) throws Exception {

Publisher publisher = null;
try {
publisher = Publisher.Builder.newBuilder(topic).build();
publisher = Publisher.newBuilder(topic).build();
List<String> messages = Arrays.asList("first message", "second message");
List<ListenableFuture<String>> messageIds = new ArrayList<>();
for (String message : messages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
*
* <pre><code>
* Publisher publisher =
* Publisher.Builder.newBuilder(MY_TOPIC)
* Publisher.newBuilder(MY_TOPIC)
* .setMaxBundleDuration(new Duration(10 * 1000))
* .build();
* List&lt;ListenableFuture&lt;String&gt;&gt; results = new ArrayList&lt;&gt;();
Expand Down Expand Up @@ -122,7 +122,8 @@ public static long getApiMaxRequestBytes() {

private static final Logger logger = LoggerFactory.getLogger(Publisher.class);

private final String topic;
private final TopicName topicName;
private final String cachedTopicNameString;

private final BundlingSettings bundlingSettings;
private final RetrySettings retrySettings;
Expand All @@ -149,7 +150,8 @@ public static long getApiMaxRequestBytes() {
private ScheduledFuture<?> currentAlarmFuture;

private Publisher(Builder builder) throws IOException {
topic = builder.topic;
topicName = builder.topicName;
cachedTopicNameString = topicName.toString();

this.bundlingSettings = builder.bundlingSettings;
this.retrySettings = builder.retrySettings;
Expand Down Expand Up @@ -198,8 +200,8 @@ public void close() throws IOException {
}

/** Topic which the publisher publishes to. */
public String getTopic() {
return topic;
public TopicName getTopicName() {
return topicName;
}

/**
Expand Down Expand Up @@ -333,7 +335,7 @@ private void publishAllOutstanding() {

private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) {
PublishRequest.Builder publishRequest = PublishRequest.newBuilder();
publishRequest.setTopic(topic);
publishRequest.setTopic(cachedTopicNameString);
for (OutstandingPublish outstandingPublish : outstandingBundle.outstandingPublishes) {
publishRequest.addMessages(outstandingPublish.message);
}
Expand Down Expand Up @@ -528,6 +530,11 @@ interface LongRandom {
long nextLong(long least, long bound);
}

/** Constructs a new {@link Builder} using the given topic. */
public static Builder newBuilder(TopicName topicName) {
return new Builder(topicName);
}

/** A builder of {@link Publisher}s. */
public static final class Builder {
static final Duration MIN_TOTAL_TIMEOUT = new Duration(10 * 1000); // 10 seconds
Expand Down Expand Up @@ -569,7 +576,7 @@ public long nextLong(long least, long bound) {
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
.build();

String topic;
TopicName topicName;

// Bundling options
BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
Expand All @@ -588,13 +595,8 @@ public long nextLong(long least, long bound) {

ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;

/** Constructs a new {@link Builder} using the given topic. */
public static Builder newBuilder(TopicName topic) {
return new Builder(topic.toString());
}

Builder(String topic) {
this.topic = Preconditions.checkNotNull(topic);
private Builder(TopicName topic) {
this.topicName = Preconditions.checkNotNull(topic);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import com.google.common.util.concurrent.Service;
import com.google.pubsub.v1.SubscriptionName;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.GrpcSslContexts;
Expand All @@ -43,9 +41,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,7 +88,7 @@
* }
*
* Subscriber subscriber =
* Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
* Subscriber.newBuilder(MY_SUBSCRIPTION, receiver)
* .setMaxBundleAcks(100)
* .build();
*
Expand Down Expand Up @@ -123,19 +123,33 @@ private Subscriber(Builder builder) throws IOException {
impl = new SubscriberImpl(builder);
}

/**
* Constructs a new {@link Builder}.
*
* <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
* Subscriber}.
*
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
* @param receiver an implementation of {@link MessageReceiver} used to process the received
* messages
*/
public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) {
return new Builder(subscription, receiver);
}

/** Subscription which the subscriber is subscribed to. */
public String getSubscription() {
return impl.getSubscription();
public SubscriptionName getSubscriptionName() {
return impl.subscriptionName;
}

/** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
public Duration getAckExpirationPadding() {
return impl.getAckExpirationPadding();
return impl.ackExpirationPadding;
}

/** The flow control settings the Subscriber is configured with. */
public FlowController.Settings getFlowControlSettings() {
return impl.getFlowControlSettings();
return impl.flowControlSettings;
}

public void addListener(final SubscriberListener listener, Executor executor) {
Expand Down Expand Up @@ -249,7 +263,8 @@ public void terminated(State from) {}
private static class SubscriberImpl extends AbstractService {
private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);

private final String subscription;
private final SubscriptionName subscriptionName;
private final String cachedSubscriptionNameString;
private final FlowController.Settings flowControlSettings;
private final Duration ackExpirationPadding;
private final ScheduledExecutorService executor;
Expand All @@ -270,7 +285,8 @@ private static class SubscriberImpl extends AbstractService {
private SubscriberImpl(Builder builder) throws IOException {
receiver = builder.receiver;
flowControlSettings = builder.flowControlSettings;
subscription = builder.subscription;
subscriptionName = builder.subscriptionName;
cachedSubscriptionNameString = subscriptionName.toString();
ackExpirationPadding = builder.ackExpirationPadding;
streamAckDeadlineSeconds =
Math.max(
Expand Down Expand Up @@ -340,7 +356,7 @@ private void startStreamingConnections() {
for (int i = 0; i < numChannels; i++) {
streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
subscription,
cachedSubscriptionNameString,
credentials,
receiver,
ackExpirationPadding,
Expand Down Expand Up @@ -412,7 +428,7 @@ private void startPollingConnections() {
for (int i = 0; i < numChannels; i++) {
pollingSubscriberConnections.add(
new PollingSubscriberConnection(
subscription,
cachedSubscriptionNameString,
credentials,
receiver,
ackExpirationPadding,
Expand Down Expand Up @@ -496,21 +512,6 @@ public void run() {
throw new IllegalStateException(e);
}
}

/** Subscription which the subscriber is subscribed to. */
public String getSubscription() {
return subscription;
}

/** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
public Duration getAckExpirationPadding() {
return ackExpirationPadding;
}

/** The flow control settings the Subscriber is configured with. */
public FlowController.Settings getFlowControlSettings() {
return flowControlSettings;
}
}

/** Builder of {@link Subscriber Subscribers}. */
Expand All @@ -526,7 +527,7 @@ public static final class Builder {
* Runtime.getRuntime().availableProcessors())
.build();

String subscription;
SubscriptionName subscriptionName;
Optional<Credentials> credentials = Optional.absent();
MessageReceiver receiver;

Expand All @@ -539,22 +540,8 @@ public static final class Builder {
Optional.absent();
Optional<Clock> clock = Optional.absent();

/**
* Constructs a new {@link Builder}.
*
* <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
* Subscriber}.
*
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
* @param receiver an implementation of {@link MessageReceiver} used to process the received
* messages
*/
public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) {
return new Builder(subscription.toString(), receiver);
}

Builder(String subscription, MessageReceiver receiver) {
this.subscription = subscription;
Builder(SubscriptionName subscriptionName, MessageReceiver receiver) {
this.subscriptionName = subscriptionName;
this.receiver = receiver;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
public void testPublisherGetters() throws Exception {
FakeCredentials credentials = new FakeCredentials();

Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC);
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
builder.setChannelBuilder(testChannelBuilder);
builder.setCredentials(credentials);
builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR);
Expand All @@ -370,7 +370,7 @@ public void testPublisherGetters() throws Exception {
.build());
Publisher publisher = builder.build();

assertEquals(TEST_TOPIC.toString(), publisher.getTopic());
assertEquals(TEST_TOPIC, publisher.getTopicName());
assertEquals(10, (long) publisher.getBundlingSettings().getRequestByteThreshold());
assertEquals(new Duration(11), publisher.getBundlingSettings().getDelayThreshold());
assertEquals(12, (long) publisher.getBundlingSettings().getElementCountThreshold());
Expand All @@ -384,8 +384,8 @@ public void testPublisherGetters() throws Exception {

@Test
public void testBuilderParametersAndDefaults() {
Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC);
assertEquals(TEST_TOPIC.toString(), builder.topic);
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
assertEquals(TEST_TOPIC, builder.topicName);
assertEquals(Optional.absent(), builder.channelBuilder);
assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
assertFalse(builder.failOnFlowControlLimits);
Expand All @@ -404,7 +404,7 @@ public void testBuilderParametersAndDefaults() {

@Test
public void testBuilderInvalidArguments() {
Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC);
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);

try {
builder.setChannelBuilder(null);
Expand Down Expand Up @@ -601,7 +601,7 @@ public void testBuilderInvalidArguments() {
}

private Builder getTestPublisherBuilder() {
return Publisher.Builder.newBuilder(TEST_TOPIC)
return Publisher.newBuilder(TEST_TOPIC)
.setCredentials(testCredentials)
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
.setChannelBuilder(testChannelBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service.State;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullResponse;
Expand Down Expand Up @@ -467,7 +466,7 @@ private void sendMessages(Iterable<String> ackIds) throws InterruptedException {
}

private Builder getTestSubscriberBuilder(MessageReceiver receiver) {
return Subscriber.Builder.newBuilder(TEST_SUBSCRIPTION, receiver)
return Subscriber.newBuilder(TEST_SUBSCRIPTION, receiver)
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
.setCredentials(testCredentials)
.setChannelBuilder(testChannelBuilder)
Expand Down

0 comments on commit 16957b0

Please # to comment.