ackIds) {
- ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
- .setSubscription(
- SubscriberClient.formatSubscriptionName(getOptions().getProjectId(), subscription))
- .setAckDeadlineSeconds((int) TimeUnit.SECONDS.convert(deadline, unit))
- .addAllAckIds(ackIds)
- .build();
- return transform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
- }
-
@Override
public Policy getTopicPolicy(String topic) {
return get(getTopicPolicyAsync(topic));
@@ -761,8 +621,5 @@ public void close() throws Exception {
}
closed = true;
rpc.close();
- if (ackDeadlineRenewer != null) {
- ackDeadlineRenewer.close();
- }
}
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/ReceivedMessage.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/ReceivedMessage.java
deleted file mode 100644
index 4afbef2e5a9b..000000000000
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/ReceivedMessage.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Copyright 2016 Google Inc. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.pubsub;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.ByteArray;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A Google Cloud Pub/Sub received message. A received message has all the information in
- * {@link Message} as well as the acknowledge id. The ack id can be used to acknowledge the received
- * message.
- *
- * {@code ReceivedMessage} also adds a layer of service-related functionality over
- * {@link Message} that help manage received messages (see {@link #ack()}, {@link #nack()} and
- * {@link #modifyAckDeadline(int, TimeUnit)}).
- */
-public final class ReceivedMessage extends Message {
-
- private static final long serialVersionUID = -4178477763916251733L;
-
- private final String subscription;
- private final String ackId;
- private transient PubSub pubsub;
- private final PubSubOptions options;
-
- public static final class Builder extends Message.Builder {
-
- private final String subscription;
- private final String ackId;
- private final PubSub pubsub;
- private final BuilderImpl delegate;
-
- private Builder(String subscription, String ackId, PubSub pubsub, BuilderImpl delegate) {
- this.subscription = subscription;
- this.ackId = ackId;
- this.pubsub = pubsub;
- this.delegate = delegate;
- }
-
- @Override
- Builder setId(String id) {
- delegate.setId(id);
- return this;
- }
-
- @Override
- @Deprecated
- public Builder payload(String payload) {
- return setPayload(payload);
- }
-
- @Override
- public Builder setPayload(String payload) {
- delegate.setPayload(payload);
- return this;
- }
-
- @Override
- @Deprecated
- public Builder payload(ByteArray payload) {
- return setPayload(payload);
- }
-
- @Override
- public Builder setPayload(ByteArray payload) {
- delegate.setPayload(payload);
- return this;
- }
-
- @Override
- @Deprecated
- public Builder attributes(Map attributes) {
- return setAttributes(attributes);
- }
-
- @Override
- public Builder setAttributes(Map attributes) {
- delegate.setAttributes(attributes);
- return this;
- }
-
- @Override
- public Builder addAttribute(String name, String value) {
- delegate.addAttribute(name, value);
- return this;
- }
-
- @Override
- public Builder removeAttribute(String name) {
- delegate.removeAttribute(name);
- return this;
- }
-
- @Override
- public Builder clearAttributes() {
- delegate.clearAttributes();
- return this;
- }
-
- @Override
- Builder setPublishTime(long publishTime) {
- delegate.setPublishTime(publishTime);
- return this;
- }
-
- @Override
- public ReceivedMessage build() {
- return new ReceivedMessage(this);
- }
- }
-
- ReceivedMessage(Builder builder) {
- super(builder.delegate);
- subscription = checkNotNull(builder.subscription);
- ackId = checkNotNull(builder.ackId);
- pubsub = checkNotNull(builder.pubsub);
- options = pubsub.getOptions();
- }
-
- @Override
- public Builder toBuilder() {
- return new Builder(subscription, ackId, pubsub, new BuilderImpl(this));
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(options, super.hashCode());
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null || !obj.getClass().equals(ReceivedMessage.class)) {
- return false;
- }
- ReceivedMessage other = (ReceivedMessage) obj;
- return baseEquals(other) && Objects.equals(options, other.options);
- }
-
- /**
- * Returns the received message's {@code PubSub} object used to issue requests.
- */
- @Deprecated
- public PubSub pubsub() {
- return getPubsub();
- }
-
- /**
- * Returns the received message's {@code PubSub} object used to issue requests.
- */
- public PubSub getPubsub() {
- return pubsub;
- }
-
- /**
- * Returns the name of the subscription this message was received from.
- */
- @Deprecated
- public String subscription() {
- return getSubscription();
- }
-
- /**
- * Returns the name of the subscription this message was received from.
- */
- public String getSubscription() {
- return subscription;
- }
-
- /**
- * Returns the acknowledge id of the message. The ack id can be used to acknowledge the received
- * message.
- */
- @Deprecated
- public String ackId() {
- return getAckId();
- }
-
- /**
- * Returns the acknowledge id of the message. The ack id can be used to acknowledge the received
- * message.
- */
- public String getAckId() {
- return ackId;
- }
-
- /**
- * Acknowledges the current message.
- *
- * @throws PubSubException upon failure, or if the subscription was not found
- */
- public void ack() {
- pubsub.ack(subscription, ackId);
- }
-
- /**
- * Sends a request to acknowledge the current message. The method returns a {@code Future} object
- * that can be used to wait for the acknowledge operation to be completed.
- *
- * @throws PubSubException upon failure, or if the subscription was not found
- */
- public Future ackAsync() {
- return pubsub.ackAsync(subscription, ackId);
- }
-
- /**
- * "Nacks" the current message. This method corresponds to calling
- * {@link #modifyAckDeadline(int, TimeUnit)} with a deadline of 0.
- *
- * @throws PubSubException upon failure, or if the subscription was not found
- */
- public void nack() {
- pubsub.nack(subscription, ackId);
- }
-
- /**
- * Sends a request to "nack" the current message. This method corresponds to calling
- * {@link #modifyAckDeadlineAsync(int, TimeUnit)} with a deadline of 0. The method returns a
- * {@code Future} object that can be used to wait for the "nack" operation to be completed.
- *
- * @throws PubSubException upon failure, or if the subscription was not found
- */
- public Future nackAsync() {
- return pubsub.nackAsync(subscription, ackId);
- }
-
- /**
- * Modifies the acknowledge deadline of the current message. {@code deadline} must be >= 0 and
- * is the new deadline with respect to the time the modify request was received by the Pub/Sub
- * service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
- * the new ack deadline will expire 10 seconds after the modify request was received by the
- * service. Specifying 0 may be used to make the message available for another pull request
- * (corresponds to calling {@link #nack()}.
- *
- * @param deadline the new deadline, relative to the time the modify request is received by the
- * Pub/Sub service
- * @param unit {@code deadline} time unit
- * @throws PubSubException upon failure, or if the subscription was not found
- */
- public void modifyAckDeadline(int deadline, TimeUnit unit) {
- pubsub.modifyAckDeadline(subscription, deadline, unit, ackId);
- }
-
- /**
- * Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
- * must be >= 0 and is the new deadline with respect to the time the modify request was
- * received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
- * {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
- * was received by the service. Specifying 0 may be used to make the message available for another
- * pull request (corresponds to calling {@link #nackAsync()}. The method returns a {@code Future}
- * object that can be used to wait for the modify operation to be completed.
- *
- * @param deadline the new deadline, relative to the time the modify request is received by the
- * Pub/Sub service
- * @param unit {@code deadline} time unit
- * @throws PubSubException upon failure, or if the subscription was not found
- */
- public Future modifyAckDeadlineAsync(int deadline, TimeUnit unit) {
- return pubsub.modifyAckDeadlineAsync(subscription, deadline, unit, ackId);
- }
-
- private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
- input.defaultReadObject();
- this.pubsub = options.getService();
- }
-
- static ReceivedMessage fromPb(PubSub pubsub, String subscription,
- com.google.pubsub.v1.ReceivedMessage msgPb) {
- Message message = fromPb(msgPb.getMessage());
- String ackId = msgPb.getAckId();
- return new Builder(subscription, ackId, pubsub, new BuilderImpl(message)).build();
- }
-}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java
index 85a270a5fcc0..f0f589410300 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java
@@ -20,8 +20,6 @@
import com.google.cloud.GrpcServiceOptions;
import com.google.cloud.Policy;
-import com.google.cloud.pubsub.PubSub.MessageConsumer;
-import com.google.cloud.pubsub.PubSub.MessageProcessor;
import com.google.cloud.pubsub.PubSub.PullOption;
import com.google.common.base.Function;
@@ -343,98 +341,6 @@ public Future replacePushConfigAsync(PushConfig pushConfig) {
return pubsub.replacePushConfigAsync(getName(), pushConfig);
}
- /**
- * Pulls messages from this subscription. This method possibly returns no messages if no message
- * was available at the time the request was processed by the Pub/Sub service (i.e. the system is
- * not allowed to wait until at least one message is available). Pulled messages have their
- * acknowledge deadline automatically renewed until they are explicitly consumed using
- * {@link Iterator#next()}.
- *
- * Example of pulling a maximum number of messages from the subscription.
- *
{@code
- * Iterator messages = subscription.pull(100);
- * // Ack deadline is renewed until the message is consumed
- * while (messages.hasNext()) {
- * ReceivedMessage message = messages.next();
- * // do something with message and ack/nack it
- * message.ack(); // or message.nack()
- * }
- * }
- *
- * @param maxMessages the maximum number of messages pulled by this method. This method can
- * possibly return fewer messages.
- * @throws PubSubException upon failure
- */
- public Iterator pull(int maxMessages) {
- return pubsub.pull(getName(), maxMessages);
- }
-
- /**
- * Sends a request for pulling messages from this subscription. This method returns a
- * {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
- * This method possibly returns no messages if no message was available at the time the request
- * was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
- * message is available).
- *
- * Example of asynchronously pulling a maximum number of messages from the subscription.
- *
{@code
- * Future> future = subscription.pullAsync(100);
- * // ...
- * Iterator messages = future.get();
- * // Ack deadline is renewed until the message is consumed
- * while (messages.hasNext()) {
- * ReceivedMessage message = messages.next();
- * // do something with message and ack/nack it
- * message.ack(); // or message.nack()
- * }
- * }
- *
- * @param maxMessages the maximum number of messages pulled by this method. This method can
- * possibly return fewer messages.
- * @throws PubSubException upon failure
- */
- public Future> pullAsync(int maxMessages) {
- return pubsub.pullAsync(getName(), maxMessages);
- }
-
- /**
- * Creates a message consumer that pulls messages from this subscription. You can stop pulling
- * messages by calling {@link MessageConsumer#close()}. The returned message consumer executes
- * {@link MessageProcessor#process(Message)} on each pulled message. If
- * {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If
- * {@link MessageProcessor#process(Message)} throws an exception, the message is "nacked". For
- * all pulled messages, the ack deadline is automatically renewed until the message is either
- * acknowledged or "nacked".
- *
- * The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum
- * number of queued messages (messages either being processed or waiting to be processed). The
- * {@link PullOption#executorFactory(GrpcServiceOptions.ExecutorFactory)} can be used to provide
- * an executor to run message processor callbacks.
- *
- *
Example of continuously pulling messages from the subscription.
- *
{@code
- * String subscriptionName = "my_subscription_name";
- * MessageProcessor callback = new MessageProcessor() {
- * public void process(Message message) throws Exception {
- * // Ack deadline is renewed until this method returns
- * // Message is acked if this method returns successfully
- * // Message is nacked if this method throws an exception
- * }
- * };
- * MessageConsumer consumer = subscription.pullAsync(callback);
- * // ...
- * // Stop pulling
- * consumer.close();
- * }
- *
- * @param callback the callback to be executed on each message
- * @param options pulling options
- * @return a message consumer for the provided subscription and options
- */
- public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
- return pubsub.pullAsync(getName(), callback, options);
- }
-
/**
* Returns the IAM access control policy for this subscription. Returns {@code null} if the
* subscription was not found.
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java
deleted file mode 100644
index ce96a0f4ad80..000000000000
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Copyright 2016 Google Inc. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.pubsub;
-
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
-import com.google.common.collect.ImmutableList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.Timeout;
-import org.junit.Test;
-
-public class AckDeadlineRenewerTest {
-
- private static final int MIN_DEADLINE_MILLIS = 10_000;
- private static final int TIME_ADVANCE = 9_000;
-
- private static final String SUBSCRIPTION1 = "subscription1";
- private static final String SUBSCRIPTION2 = "subscription2";
- private static final String ACK_ID1 = "ack-id1";
- private static final String ACK_ID2 = "ack-id2";
- private static final String ACK_ID3 = "ack-id3";
-
- private PubSub pubsub;
- private FakeScheduledExecutorService executorService;
- private AckDeadlineRenewer ackDeadlineRenewer;
-
- @Rule
- public Timeout globalTimeout = Timeout.seconds(60);
-
- @Before
- public void setUp() {
- pubsub = EasyMock.createStrictMock(PubSub.class);
- executorService = new FakeScheduledExecutorService();
- ExecutorFactory executorFactory = new ExecutorFactory() {
- @Override
- public ExecutorService get() {
- return executorService;
- }
- @Override
- public void release(ExecutorService executor) {
- executorService.shutdown();
- }
- };
- PubSubOptions options = PubSubOptions.newBuilder()
- .setProjectId("projectId")
- .setExecutorFactory(executorFactory)
- .setClock(executorService.getClock())
- .build();
- EasyMock.expect(pubsub.getOptions()).andReturn(options);
- EasyMock.replay(pubsub);
- ackDeadlineRenewer = new AckDeadlineRenewer(pubsub);
- }
-
- @After
- public void tearDown() throws Exception {
- EasyMock.verify(pubsub);
- ackDeadlineRenewer.close();
- }
-
- private IAnswer> createAnswer(final CountDownLatch latch,
- final AtomicLong renewal) {
- return new IAnswer>() {
- @Override
- public Future answer() throws Throwable {
- latch.countDown();
- renewal.set(executorService.getClock().millis());
- return null;
- }
- };
- }
-
- @Test
- public void testAddOneMessage() throws InterruptedException {
- EasyMock.reset(pubsub);
- final CountDownLatch firstLatch = new CountDownLatch(1);
- final CountDownLatch secondLatch = new CountDownLatch(1);
- final AtomicLong firstRenewal = new AtomicLong();
- final AtomicLong secondRenewal = new AtomicLong();
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
- .andAnswer(createAnswer(firstLatch, firstRenewal));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
- .andAnswer(createAnswer(secondLatch, secondRenewal));
- EasyMock.replay(pubsub);
- long addTime = executorService.getClock().millis();
- ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1);
- executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
- firstLatch.await();
- assertTrue(firstRenewal.get() < (addTime + MIN_DEADLINE_MILLIS));
- executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
- secondLatch.await();
- assertTrue(secondRenewal.get() < (firstRenewal.get() + MIN_DEADLINE_MILLIS));
- }
-
- @Test
- public void testAddMessages() throws InterruptedException {
- EasyMock.reset(pubsub);
- final CountDownLatch firstLatch = new CountDownLatch(2);
- final CountDownLatch secondLatch = new CountDownLatch(2);
- final AtomicLong firstRenewalSub1 = new AtomicLong();
- final AtomicLong firstRenewalSub2 = new AtomicLong();
- final AtomicLong secondRenewalSub1 = new AtomicLong();
- final AtomicLong secondRenewalSub2 = new AtomicLong();
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2)))
- .andAnswer(createAnswer(firstLatch, firstRenewalSub1));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
- .andAnswer(createAnswer(firstLatch, firstRenewalSub2));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2)))
- .andAnswer(createAnswer(secondLatch, secondRenewalSub1));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID3)))
- .andAnswer(createAnswer(secondLatch, secondRenewalSub2));
- EasyMock.replay(pubsub);
- long addTime1 = executorService.getClock().millis();
- ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
- ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
- executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
- firstLatch.await();
- assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
- assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
- ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID3);
- executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
- secondLatch.await();
- assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
- assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
- }
-
- @Test
- public void testAddExistingMessage() throws InterruptedException {
- EasyMock.reset(pubsub);
- final CountDownLatch firstLatch = new CountDownLatch(2);
- final CountDownLatch secondLatch = new CountDownLatch(2);
- final AtomicLong firstRenewalSub1 = new AtomicLong();
- final AtomicLong firstRenewalSub2 = new AtomicLong();
- final AtomicLong secondRenewalSub1 = new AtomicLong();
- final AtomicLong secondRenewalSub2 = new AtomicLong();
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2)))
- .andAnswer(createAnswer(firstLatch, firstRenewalSub1));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
- .andAnswer(createAnswer(firstLatch, firstRenewalSub2));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2)))
- .andAnswer(createAnswer(secondLatch, secondRenewalSub1));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
- .andAnswer(createAnswer(secondLatch, secondRenewalSub2));
- EasyMock.replay(pubsub);
- long addTime1 = executorService.getClock().millis();
- ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
- ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
- executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
- firstLatch.await();
- assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
- assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
- ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
- executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
- secondLatch.await();
- assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
- assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
- }
-
- @Test
- public void testRemoveNonExistingMessage() throws InterruptedException {
- EasyMock.reset(pubsub);
- final CountDownLatch firstLatch = new CountDownLatch(2);
- final CountDownLatch secondLatch = new CountDownLatch(2);
- final AtomicLong firstRenewalSub1 = new AtomicLong();
- final AtomicLong firstRenewalSub2 = new AtomicLong();
- final AtomicLong secondRenewalSub1 = new AtomicLong();
- final AtomicLong secondRenewalSub2 = new AtomicLong();
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2)))
- .andAnswer(createAnswer(firstLatch, firstRenewalSub1));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
- .andAnswer(createAnswer(firstLatch, firstRenewalSub2));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2)))
- .andAnswer(createAnswer(secondLatch, secondRenewalSub1));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
- .andAnswer(createAnswer(secondLatch, secondRenewalSub2));
- EasyMock.replay(pubsub);
- long addTime1 = executorService.getClock().millis();
- ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
- ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
- executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
- firstLatch.await();
- assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
- assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
- ackDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID3);
- executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
- secondLatch.await();
- assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
- assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
- }
-
- @Test
- public void testRemoveMessage() throws InterruptedException {
- EasyMock.reset(pubsub);
- final CountDownLatch firstLatch = new CountDownLatch(2);
- final CountDownLatch secondLatch = new CountDownLatch(2);
- final AtomicLong firstRenewalSub1 = new AtomicLong();
- final AtomicLong firstRenewalSub2 = new AtomicLong();
- final AtomicLong secondRenewalSub1 = new AtomicLong();
- final AtomicLong secondRenewalSub2 = new AtomicLong();
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2)))
- .andAnswer(createAnswer(firstLatch, firstRenewalSub1));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
- .andAnswer(createAnswer(firstLatch, firstRenewalSub2));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
- .andAnswer(createAnswer(secondLatch, secondRenewalSub1));
- EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLIS,
- TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
- .andAnswer(createAnswer(secondLatch, secondRenewalSub2));
- EasyMock.replay(pubsub);
- long addTime1 = executorService.getClock().millis();
- ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
- ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
- executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
- firstLatch.await();
- assertTrue(firstRenewalSub1.get() < (addTime1 + MIN_DEADLINE_MILLIS));
- assertTrue(firstRenewalSub2.get() < (addTime1 + MIN_DEADLINE_MILLIS));
- ackDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID2);
- executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
- secondLatch.await();
- assertTrue(secondRenewalSub1.get() < (firstRenewalSub1.get() + MIN_DEADLINE_MILLIS));
- assertTrue(secondRenewalSub2.get() < (firstRenewalSub2.get() + MIN_DEADLINE_MILLIS));
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testClose() throws Exception {
- PubSub pubsub = EasyMock.createStrictMock(PubSub.class);
- ScheduledExecutorService executor = EasyMock.createStrictMock(ScheduledExecutorService.class);
- ExecutorFactory executorFactory = EasyMock.createStrictMock(ExecutorFactory.class);
- EasyMock.expect(executorFactory.get()).andReturn(executor);
- PubSubOptions options = PubSubOptions.newBuilder()
- .setProjectId("projectId")
- .setExecutorFactory(executorFactory)
- .build();
- EasyMock.expect(pubsub.getOptions()).andReturn(options);
- executorFactory.release(executor);
- EasyMock.expectLastCall();
- EasyMock.replay(executor, executorFactory, pubsub);
- AckDeadlineRenewer ackDeadlineRenewer = new AckDeadlineRenewer(pubsub);
- ackDeadlineRenewer.close();
- EasyMock.verify(pubsub, executor, executorFactory);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testCloseWithMessage() throws Exception {
- PubSub pubsub = EasyMock.createStrictMock(PubSub.class);
- ScheduledExecutorService executor = EasyMock.createStrictMock(ScheduledExecutorService.class);
- ExecutorFactory executorFactory = EasyMock.createStrictMock(ExecutorFactory.class);
- EasyMock.expect(executorFactory.get()).andReturn(executor);
- ScheduledFuture future = EasyMock.createStrictMock(ScheduledFuture.class);
- EasyMock.expect(executor.schedule(EasyMock.anyObject(), EasyMock.anyLong(),
- EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(future);
- PubSubOptions options = PubSubOptions.newBuilder()
- .setProjectId("projectId")
- .setExecutorFactory(executorFactory)
- .build();
- EasyMock.expect(pubsub.getOptions()).andReturn(options);
- EasyMock.expect(future.cancel(true)).andReturn(true);
- executorFactory.release(executor);
- EasyMock.expectLastCall();
- EasyMock.replay(executor, executorFactory, future, pubsub);
- AckDeadlineRenewer ackDeadlineRenewer = new AckDeadlineRenewer(pubsub);
- ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1);
- ackDeadlineRenewer.close();
- EasyMock.verify(pubsub, executor, executorFactory, future);
- }
-}
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
index 2b5ab2247a16..cdafc29b8556 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
@@ -24,8 +24,6 @@
import com.google.cloud.AsyncPage;
import com.google.cloud.Page;
-import com.google.cloud.pubsub.PubSub.MessageConsumer;
-import com.google.cloud.pubsub.PubSub.MessageProcessor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -438,400 +436,4 @@ public void testListSubscriptionsAsync() throws ExecutionException, InterruptedE
assertTrue(subscription2.delete());
assertTrue(subscription3.delete());
}
-
- @Test
- public void testPullMessages() {
- String topic = formatForTest("test-pull-messages-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-pull-messages-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
- assertEquals(2, messageIds.size());
- Iterator iterator = pubsub().pull(subscription, 2);
- assertEquals(message1.getPayloadAsString(), iterator.next().getPayloadAsString());
- assertEquals(message2.getPayloadAsString(), iterator.next().getPayloadAsString());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testPullMessagesAndAutoRenewDeadline() throws InterruptedException {
- String topic = formatForTest("test-pull-messages-and-renew-deadline-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-pull-messages-and-renew-deadline-subscription");
- pubsub().create(
- SubscriptionInfo.newBuilder(topic, subscription).setAckDeadLineSeconds(10).build());
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- // todo(mziccard): use bundle publish if #1017 gets fixed, or remove this comment
- pubsub().publish(topic, message1);
- pubsub().publish(topic, message2);
- Iterator iterator = pubsub().pull(subscription, 2);
- while (!iterator.hasNext()) {
- Thread.sleep(500);
- iterator = pubsub().pull(subscription, 2);
- }
- ReceivedMessage consumedMessage = iterator.next();
- if (!iterator.hasNext()) {
- iterator = pubsub().pull(subscription, 1);
- while (!iterator.hasNext()) {
- Thread.sleep(500);
- iterator = pubsub().pull(subscription, 1);
- }
- }
- Thread.sleep(15000);
- // first message was consumed while second message is still being renewed
- Iterator nextIterator = pubsub().pull(subscription, 2);
- assertTrue(nextIterator.hasNext());
- ReceivedMessage message = nextIterator.next();
- assertEquals(consumedMessage.getPayloadAsString(), message.getPayloadAsString());
- assertFalse(nextIterator.hasNext());
- consumedMessage.ack();
- iterator.next().ack();
- nextIterator = pubsub().pull(subscription, 2);
- assertFalse(nextIterator.hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testPullMessagesAndModifyAckDeadline() throws InterruptedException {
- String topic = formatForTest("test-pull-messages-and-modify-deadline-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-pull-messages-and-modify-deadline-subscription");
- pubsub().create(
- SubscriptionInfo.newBuilder(topic, subscription).setAckDeadLineSeconds(10).build());
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- // todo(mziccard): use bundle publish if #1017 gets fixed, or remove this comment
- pubsub().publish(topic, message1);
- pubsub().publish(topic, message2);
- // Consume all messages and stop ack renewal
- List receivedMessages = Lists.newArrayList(pubsub().pull(subscription, 2));
- while (receivedMessages.size() < 2) {
- Thread.sleep(500);
- Iterators.addAll(receivedMessages, pubsub().pull(subscription, 2));
- }
- receivedMessages.get(0).modifyAckDeadline(60, TimeUnit.SECONDS);
- Thread.sleep(15000);
- // first message was renewed while second message should still be sent on pull requests
- Iterator nextIterator = pubsub().pull(subscription, 2);
- assertTrue(nextIterator.hasNext());
- ReceivedMessage message = nextIterator.next();
- assertEquals(receivedMessages.get(1).getPayloadAsString(), message.getPayloadAsString());
- assertFalse(nextIterator.hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testPullNonExistingSubscription() {
- thrown.expect(PubSubException.class);
- pubsub().pull(formatForTest("non-existing-subscription"), 2);
- }
-
- @Test
- public void testPullMessagesAsync() throws ExecutionException, InterruptedException {
- String topic = formatForTest("test-pull-messages-async-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-pull-messages-async-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
- assertEquals(2, messageIds.size());
- Iterator iterator = pubsub().pullAsync(subscription, 2).get();
- assertEquals(message1.getPayloadAsString(), iterator.next().getPayloadAsString());
- assertEquals(message2.getPayloadAsString(), iterator.next().getPayloadAsString());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testPullMessagesAsyncNonImmediately() throws ExecutionException, InterruptedException {
- String topic = formatForTest("test-pull-messages-async-non-immediately-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-pull-messages-async-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Future> future = pubsub().pullAsync(subscription, 2);
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
- assertEquals(2, messageIds.size());
- Iterator iterator = future.get();
- assertEquals(message1.getPayloadAsString(), iterator.next().getPayloadAsString());
- assertEquals(message2.getPayloadAsString(), iterator.next().getPayloadAsString());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testPullAsyncNonExistingSubscription()
- throws ExecutionException, InterruptedException {
- thrown.expect(ExecutionException.class);
- pubsub().pullAsync(formatForTest("non-existing-subscription"), 2).get();
- }
-
- @Test
- public void testMessageConsumer() throws Exception {
- String topic = formatForTest("test-message-consumer-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-message-consumer-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- Set payloads = Sets.newHashSet("payload1", "payload2");
- List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
- assertEquals(2, messageIds.size());
- final List receivedMessages = Collections.synchronizedList(new ArrayList());
- final CountDownLatch countDownLatch = new CountDownLatch(2);
- MessageProcessor processor = new MessageProcessor() {
- @Override
- public void process(Message message) throws Exception {
- receivedMessages.add(message);
- countDownLatch.countDown();
- }
- };
- try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) {
- countDownLatch.await();
- }
- for (Message message : receivedMessages) {
- payloads.contains(message.getPayloadAsString());
- }
- // Messages have all been acked, they should not be pulled again
- Iterator messages = pubsub().pull(subscription, 2);
- assertFalse(messages.hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testMessageConsumerNack() throws Exception {
- String topic = formatForTest("test-message-consumer-nack-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-message-consumer-nack-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- Set payloads = Sets.newHashSet("payload1", "payload2");
- List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
- assertEquals(2, messageIds.size());
- final List receivedMessages = Collections.synchronizedList(new ArrayList());
- final CountDownLatch countDownLatch = new CountDownLatch(2);
- MessageProcessor processor = new MessageProcessor() {
- @Override
- public void process(Message message) throws Exception {
- receivedMessages.add(message);
- countDownLatch.countDown();
- throw new RuntimeException("Force nack");
- }
- };
- try (MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) {
- countDownLatch.await();
- }
- for (Message message : receivedMessages) {
- payloads.contains(message.getPayloadAsString());
- }
- // Messages have all been nacked, we should be able to pull them again
- Thread.sleep(5000);
- Iterator messages = pubsub().pull(subscription, 2);
- while (messages.hasNext()) {
- payloads.contains(messages.next().getPayloadAsString());
- }
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testMessageConsumerWithMoreMessages() throws Exception {
- String topic = formatForTest("test-message-consumer-more-messages-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-message-consumer-more-messages-subscriptions");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- int totalMessages = 200;
- Set payloads = Sets.newHashSetWithExpectedSize(totalMessages);
- List messagesToSend = Lists.newArrayListWithCapacity(totalMessages);
- for (int i = 0; i < totalMessages; i++) {
- String payload = "payload" + i;
- messagesToSend.add(Message.of(payload));
- payloads.add(payload);
-
- }
- List messageIds = pubsub().publish(topic, messagesToSend);
- assertEquals(totalMessages, messageIds.size());
- final List receivedMessages = Collections.synchronizedList(new ArrayList());
- final CountDownLatch countDownLatch = new CountDownLatch(totalMessages);
- MessageProcessor processor = new MessageProcessor() {
- @Override
- public void process(Message message) throws Exception {
- receivedMessages.add(message);
- countDownLatch.countDown();
- }
- };
- try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) {
- countDownLatch.await();
- }
- // Messages have all been acked, they should not be pulled again
- Iterator messages = pubsub().pull(subscription, totalMessages);
- assertFalse(messages.hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testMessageConsumerAndAutoRenewDeadline() throws Exception {
- String topic = formatForTest("test-message-consumer-and-renew-deadline-topic");
- pubsub().create(TopicInfo.of(topic));
- final String subscription =
- formatForTest("test-message-consumer-and-renew-deadline-subscription");
- pubsub().create(
- SubscriptionInfo.newBuilder(topic, subscription).setAckDeadLineSeconds(10).build());
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- Set payloads = Sets.newHashSet("payload1", "payload2");
- List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
- assertEquals(2, messageIds.size());
- final List receivedMessages = Collections.synchronizedList(new ArrayList());
- final CountDownLatch countDownLatch = new CountDownLatch(2);
- MessageProcessor processor = new MessageProcessor() {
- @Override
- public void process(Message message) throws Exception {
- receivedMessages.add(message);
- Thread.sleep(15000);
- // message deadline is being renewed, it should not be pulled again
- Iterator messages = pubsub().pull(subscription, 2);
- assertFalse(messages.hasNext());
- countDownLatch.countDown();
- }
- };
- try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) {
- countDownLatch.await();
- }
- for (Message message : receivedMessages) {
- payloads.contains(message.getPayloadAsString());
- }
- // Messages have all been acked, they should not be pulled again
- Iterator messages = pubsub().pull(subscription, 2);
- assertFalse(messages.hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testAckAndNackOneMessage() {
- String topic = formatForTest("test-ack-one-message-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-ack-one-message-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Message message = Message.of("payload");
- assertNotNull(pubsub().publish(topic, message));
- Iterator receivedMessages = pubsub().pull(subscription, 1);
- receivedMessages.next().nack();
- receivedMessages = pubsub().pull(subscription, 1);
- receivedMessages.next().ack();
- assertFalse(pubsub().pull(subscription, 1).hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testAckAndNackOneMessageAsync() throws ExecutionException, InterruptedException {
- String topic = formatForTest("test-ack-one-message-async-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-ack-one-message-async-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Message message = Message.of("payload");
- assertNotNull(pubsub().publish(topic, message));
- Iterator receivedMessages = pubsub().pull(subscription, 1);
- receivedMessages.next().nackAsync().get();
- receivedMessages = pubsub().pull(subscription, 1);
- receivedMessages.next().ackAsync().get();
- assertFalse(pubsub().pull(subscription, 1).hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testAckAndNackMoreMessages() throws ExecutionException, InterruptedException {
- String topic = formatForTest("test-ack-more-messages-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-ack-more-messages-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- assertNotNull(pubsub().publish(topic, message1, message2));
- Iterator receivedMessages = pubsub().pull(subscription, 2);
- pubsub().nack(subscription, receivedMessages.next().getAckId(),
- receivedMessages.next().getAckId());
- receivedMessages = pubsub().pull(subscription, 2);
- pubsub().ack(subscription, receivedMessages.next().getAckId(),
- receivedMessages.next().getAckId());
- assertFalse(pubsub().pull(subscription, 2).hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testAckAndNackMoreMessagesAsync() throws ExecutionException, InterruptedException {
- String topic = formatForTest("test-ack-more-messages-async-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-ack-more-messages-async-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- assertNotNull(pubsub().publish(topic, message1, message2));
- Iterator receivedMessages = pubsub().pull(subscription, 2);
- pubsub().nackAsync(subscription, receivedMessages.next().getAckId(),
- receivedMessages.next().getAckId())
- .get();
- receivedMessages = pubsub().pull(subscription, 2);
- pubsub().ackAsync(subscription, receivedMessages.next().getAckId(),
- receivedMessages.next().getAckId())
- .get();
- assertFalse(pubsub().pull(subscription, 2).hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testAckAndNackMessageList() throws ExecutionException, InterruptedException {
- String topic = formatForTest("test-ack-message-list-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-ack-message-list-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- assertNotNull(pubsub().publish(topic, ImmutableList.of(message1, message2)));
- Iterator receivedMessages = pubsub().pull(subscription, 2);
- pubsub().nack(subscription,
- ImmutableList.of(receivedMessages.next().getAckId(), receivedMessages.next().getAckId()));
- receivedMessages = pubsub().pull(subscription, 2);
- pubsub().ack(subscription,
- ImmutableList.of(receivedMessages.next().getAckId(), receivedMessages.next().getAckId()));
- assertFalse(pubsub().pull(subscription, 2).hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
-
- @Test
- public void testAckAndNackMessageListAsync() throws ExecutionException, InterruptedException {
- String topic = formatForTest("test-ack-message-list-async-topic");
- pubsub().create(TopicInfo.of(topic));
- String subscription = formatForTest("test-ack-message-list-async-subscription");
- pubsub().create(SubscriptionInfo.of(topic, subscription));
- Message message1 = Message.of("payload1");
- Message message2 = Message.of("payload2");
- assertNotNull(pubsub().publish(topic, ImmutableList.of(message1, message2)));
- Iterator receivedMessages = pubsub().pull(subscription, 2);
- pubsub().nackAsync(subscription, ImmutableList.of(receivedMessages.next().getAckId(),
- receivedMessages.next().getAckId())).get();
- receivedMessages = pubsub().pull(subscription, 2);
- pubsub().ackAsync(subscription, ImmutableList.of(receivedMessages.next().getAckId(),
- receivedMessages.next().getAckId())).get();
- assertFalse(pubsub().pull(subscription, 2).hasNext());
- assertTrue(pubsub().deleteSubscription(subscription));
- assertTrue(pubsub().deleteTopic(topic));
- }
}
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java
deleted file mode 100644
index c0acf8199ce0..000000000000
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Copyright 2016 Google Inc. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.pubsub;
-
-import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
-import com.google.cloud.pubsub.PubSub.MessageConsumer;
-import com.google.cloud.pubsub.PubSub.MessageProcessor;
-import com.google.cloud.pubsub.spi.PubSubRpc;
-import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback;
-import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture;
-import com.google.common.util.concurrent.ForwardingListenableFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-public class MessageConsumerImplTest {
-
- private static final String PROJECT = "project";
- private static final String SUBSCRIPTION = "subscription";
- private static final String SUBSCRIPTION_PB = "projects/project/subscriptions/subscription";
- private static final int MAX_QUEUED_CALLBACKS = 42;
- private static final Message MESSAGE1 = Message.of("payload1");
- private static final Message MESSAGE2 = Message.of("payload2");
- private static final String ACK_ID1 = "ack-id1";
- private static final String ACK_ID2 = "ack-id2";
- private static final com.google.pubsub.v1.ReceivedMessage MESSAGE1_PB =
- com.google.pubsub.v1.ReceivedMessage.newBuilder()
- .setAckId(ACK_ID1)
- .setMessage(MESSAGE1.toPb())
- .build();
- private static final com.google.pubsub.v1.ReceivedMessage MESSAGE2_PB =
- com.google.pubsub.v1.ReceivedMessage.newBuilder()
- .setAckId(ACK_ID2)
- .setMessage(MESSAGE2.toPb())
- .build();
- private static final PullResponse PULL_RESPONSE = PullResponse.newBuilder()
- .addReceivedMessages(MESSAGE1_PB)
- .addReceivedMessages(MESSAGE2_PB)
- .build();
- private static final MessageProcessor DO_NOTHING_PROCESSOR = new MessageProcessor() {
- @Override
- public void process(Message message) throws Exception {
- // do nothing
- }
- };
- private static final MessageProcessor THROW_PROCESSOR = new MessageProcessor() {
- @Override
- public void process(Message message) throws Exception {
- throw new RuntimeException();
- }
- };
- private static final PullResponse EMPTY_RESPONSE = PullResponse.getDefaultInstance();
-
- private PubSubRpc pubsubRpc;
- private PubSub pubsub;
- private PubSubOptions options;
- private AckDeadlineRenewer renewer;
-
- @Rule
- public Timeout globalTimeout = Timeout.seconds(60);
-
- static final class TestPullFuture
- extends ForwardingListenableFuture.SimpleForwardingListenableFuture
- implements PullFuture {
-
- TestPullFuture(PullResponse response) {
- super(Futures.immediateFuture(response));
- }
-
- @Override
- public void addCallback(final PullCallback callback) {
- Futures.addCallback(delegate(), new FutureCallback() {
- @Override
- public void onSuccess(PullResponse result) {
- callback.success(result);
- }
-
- @Override
- public void onFailure(Throwable error) {
- callback.failure(error);
- }
- });
- }
- }
-
- @Before
- public void setUp() {
- pubsubRpc = EasyMock.createStrictMock(PubSubRpc.class);
- pubsub = EasyMock.createMock(PubSub.class);
- options = EasyMock.createStrictMock(PubSubOptions.class);
- renewer = EasyMock.createMock(AckDeadlineRenewer.class);
- }
-
- @After
- public void tearDown() {
- EasyMock.verify(pubsubRpc);
- EasyMock.verify(pubsub);
- EasyMock.verify(options);
- EasyMock.verify(renewer);
-
- }
-
- private static PullRequest pullRequest(int maxQueuedCallbacks) {
- return PullRequest.newBuilder()
- .setMaxMessages(maxQueuedCallbacks)
- .setSubscription(SUBSCRIPTION_PB)
- .setReturnImmediately(false)
- .build();
- }
-
- private static IAnswer createAnswer(final CountDownLatch latch) {
- return new IAnswer() {
- @Override
- public Void answer() throws Throwable {
- latch.countDown();
- return null;
- }
- };
- }
-
- @Test
- public void testMessageConsumerAck() throws Exception {
- PullRequest request = pullRequest(MAX_QUEUED_CALLBACKS);
- EasyMock.expect(options.getRpc()).andReturn(pubsubRpc);
- EasyMock.expect(options.getService()).andReturn(pubsub);
- EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();
- EasyMock.expect(pubsub.getOptions()).andReturn(options).times(2);
- final CountDownLatch latch = new CountDownLatch(2);
- EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
- EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null);
- EasyMock.replay(pubsub);
- EasyMock.expect(pubsubRpc.pull(request)).andReturn(new TestPullFuture(PULL_RESPONSE));
- EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject()))
- .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
- renewer.add(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall();
- renewer.add(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall();
- renewer.remove(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- renewer.remove(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- EasyMock.replay(pubsubRpc, options, renewer);
- try (MessageConsumer consumer =
- MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR)
- .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS)
- .build()) {
- latch.await();
- }
- }
-
- @Test
- public void testMessageConsumerNack() throws Exception {
- PullRequest request = pullRequest(MAX_QUEUED_CALLBACKS);
- EasyMock.expect(options.getRpc()).andReturn(pubsubRpc);
- EasyMock.expect(options.getService()).andReturn(pubsub);
- EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();
- EasyMock.expect(pubsub.getOptions()).andReturn(options).times(2);
- final CountDownLatch latch = new CountDownLatch(2);
- EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
- EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null);
- EasyMock.replay(pubsub);
- EasyMock.expect(pubsubRpc.pull(request)).andReturn(new TestPullFuture(PULL_RESPONSE));
- EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject()))
- .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
- renewer.add(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall();
- renewer.add(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall();
- renewer.remove(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- renewer.remove(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- EasyMock.replay(pubsubRpc, options, renewer);
- try (MessageConsumer consumer =
- MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR)
- .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS)
- .build()) {
- latch.await();
- }
- }
-
- @Test
- public void testMessageConsumerMultipleCallsAck() throws Exception {
- PullRequest request1 = pullRequest(MAX_QUEUED_CALLBACKS);
- PullRequest request2 = pullRequest(MAX_QUEUED_CALLBACKS - 1);
- PullResponse response1 = PullResponse.newBuilder()
- .addReceivedMessages(MESSAGE1_PB)
- .build();
- final PullResponse response2 = PullResponse.newBuilder()
- .addReceivedMessages(MESSAGE2_PB)
- .build();
- EasyMock.expect(options.getRpc()).andReturn(pubsubRpc);
- EasyMock.expect(options.getService()).andReturn(pubsub);
- EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();
- final CountDownLatch nextPullLatch = new CountDownLatch(1);
- final CountDownLatch latch = new CountDownLatch(2);
- EasyMock.expect(pubsub.getOptions()).andReturn(options);
- EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andAnswer(new IAnswer>() {
- @Override
- public Future answer() throws Throwable {
- nextPullLatch.await();
- return null;
- }
- });
- EasyMock.expect(pubsub.getOptions()).andReturn(options);
- EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null);
- EasyMock.replay(pubsub);
- EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1));
- EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() {
- @Override
- public PullFuture answer() throws Throwable {
- nextPullLatch.countDown();
- return new TestPullFuture(response2);
- }
- });
- EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject()))
- .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
- renewer.add(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall();
- renewer.remove(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- renewer.add(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall();
- renewer.remove(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- EasyMock.replay(pubsubRpc, options, renewer);
- try (MessageConsumer consumer =
- MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR)
- .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS)
- .build()) {
- latch.await();
- }
- }
-
- @Test
- public void testMessageConsumerMultipleCallsNack() throws Exception {
- PullRequest request1 = pullRequest(MAX_QUEUED_CALLBACKS);
- PullRequest request2 = pullRequest(MAX_QUEUED_CALLBACKS - 1);
- PullResponse response1 = PullResponse.newBuilder()
- .addReceivedMessages(MESSAGE1_PB)
- .build();
- final PullResponse response2 = PullResponse.newBuilder()
- .addReceivedMessages(MESSAGE2_PB)
- .build();
- EasyMock.expect(options.getRpc()).andReturn(pubsubRpc);
- EasyMock.expect(options.getService()).andReturn(pubsub);
- EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();
- final CountDownLatch nextPullLatch = new CountDownLatch(1);
- final CountDownLatch latch = new CountDownLatch(2);
- EasyMock.expect(pubsub.getOptions()).andReturn(options);
- EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andAnswer(new IAnswer>() {
- @Override
- public Future answer() throws Throwable {
- nextPullLatch.await();
- return null;
- }
- });
- EasyMock.expect(pubsub.getOptions()).andReturn(options);
- EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null);
- EasyMock.replay(pubsub);
- EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1));
- EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() {
- @Override
- public PullFuture answer() throws Throwable {
- nextPullLatch.countDown();
- return new TestPullFuture(response2);
- }
- });
- EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject()))
- .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
- renewer.add(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall();
- renewer.remove(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- renewer.add(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall();
- renewer.remove(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- EasyMock.replay(pubsubRpc, options, renewer);
- try (MessageConsumer consumer =
- MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR)
- .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS)
- .build()) {
- latch.await();
- }
- }
-
- @Test
- public void testMessageConsumerMaxCallbacksAck() throws Exception {
- PullRequest request1 = pullRequest(2);
- PullRequest request2 = pullRequest(1);
- final PullResponse otherPullResponse = PullResponse.newBuilder()
- .addReceivedMessages(MESSAGE1_PB)
- .build();
- EasyMock.expect(options.getRpc()).andReturn(pubsubRpc);
- EasyMock.expect(options.getService()).andReturn(pubsub);
- EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();
- EasyMock.expect(pubsub.getOptions()).andReturn(options).times(2);
- final CountDownLatch nextPullLatch = new CountDownLatch(1);
- final CountDownLatch latch = new CountDownLatch(3);
- EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
- EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andAnswer(new IAnswer>() {
- @Override
- public Future answer() throws Throwable {
- nextPullLatch.await();
- return null;
- }
- });
- EasyMock.expect(pubsub.getOptions()).andReturn(options);
- EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
- EasyMock.replay(pubsub);
- EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE));
- EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() {
- @Override
- public PullFuture answer() throws Throwable {
- nextPullLatch.countDown();
- return new TestPullFuture(otherPullResponse);
- }
- });
- EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject()))
- .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
- renewer.add(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall();
- renewer.add(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall();
- renewer.remove(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- renewer.remove(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- renewer.add(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall();
- renewer.remove(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- EasyMock.replay(pubsubRpc, options, renewer);
- try (MessageConsumer consumer =
- MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR)
- .maxQueuedCallbacks(2)
- .build()) {
- latch.await();
- }
- }
-
- @Test
- public void testMessageConsumerMaxCallbacksNack() throws Exception {
- PullRequest request1 = pullRequest(2);
- PullRequest request2 = pullRequest(1);
- final PullResponse otherPullResponse = PullResponse.newBuilder()
- .addReceivedMessages(MESSAGE1_PB)
- .build();
- EasyMock.expect(options.getRpc()).andReturn(pubsubRpc);
- EasyMock.expect(options.getService()).andReturn(pubsub);
- EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();
- EasyMock.expect(pubsub.getOptions()).andReturn(options).times(2);
- final CountDownLatch nextPullLatch = new CountDownLatch(1);
- final CountDownLatch latch = new CountDownLatch(3);
- EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
- EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andAnswer(new IAnswer>() {
- @Override
- public Future answer() throws Throwable {
- nextPullLatch.await();
- return null;
- }
- });
- EasyMock.expect(pubsub.getOptions()).andReturn(options);
- EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
- EasyMock.replay(pubsub);
- EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE));
- EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() {
- @Override
- public PullFuture answer() throws Throwable {
- nextPullLatch.countDown();
- return new TestPullFuture(otherPullResponse);
- }
- });
- EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject()))
- .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
- renewer.add(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall();
- renewer.add(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall();
- renewer.remove(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- renewer.remove(SUBSCRIPTION, ACK_ID2);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- renewer.add(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall();
- renewer.remove(SUBSCRIPTION, ACK_ID1);
- EasyMock.expectLastCall().andAnswer(createAnswer(latch));
- EasyMock.replay(pubsubRpc, options, renewer);
- try (MessageConsumer consumer =
- MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR)
- .maxQueuedCallbacks(2)
- .build()) {
- latch.await();
- }
- }
-
- @Test
- public void testClose() throws Exception {
- EasyMock.expect(options.getRpc()).andReturn(pubsubRpc);
- EasyMock.expect(options.getService()).andReturn(pubsub);
- final ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class);
- executor.shutdown();
- EasyMock.expectLastCall();
- EasyMock.replay(pubsubRpc, pubsub, options, executor, renewer);
- MessageConsumer consumer =
- MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR)
- .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS)
- .executorFactory(new ExecutorFactory() {
- @Override
- public ExecutorService get() {
- return executor;
- }
-
- @Override
- public void release(ExecutorService executor) {
- executor.shutdown();
- }
- }).build();
- consumer.close();
- // closing again should do nothing
- consumer.close();
- EasyMock.verify(executor);
- }
-}
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
index 9ace15da5027..23beb76c8b06 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
@@ -23,23 +23,15 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import com.google.cloud.AsyncPage;
-import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
import com.google.cloud.Identity;
import com.google.cloud.Page;
import com.google.cloud.Policy;
import com.google.cloud.RetryParams;
import com.google.cloud.Role;
-import com.google.cloud.pubsub.MessageConsumerImplTest.TestPullFuture;
import com.google.cloud.pubsub.PubSub.ListOption;
-import com.google.cloud.pubsub.PubSub.MessageConsumer;
-import com.google.cloud.pubsub.PubSub.MessageProcessor;
-import com.google.cloud.pubsub.PubSub.PullOption;
import com.google.cloud.pubsub.spi.PubSubRpc;
-import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback;
-import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture;
import com.google.cloud.pubsub.spi.PubSubRpcFactory;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
@@ -51,7 +43,6 @@
import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.protobuf.Empty;
-import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.DeleteSubscriptionRequest;
import com.google.pubsub.v1.DeleteTopicRequest;
import com.google.pubsub.v1.GetSubscriptionRequest;
@@ -62,31 +53,19 @@
import com.google.pubsub.v1.ListTopicSubscriptionsResponse;
import com.google.pubsub.v1.ListTopicsRequest;
import com.google.pubsub.v1.ListTopicsResponse;
-import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ModifyPushConfigRequest;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-
-import org.easymock.Capture;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.easymock.EasyMock;
-import org.easymock.IAnswer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
public class PubSubImplTest {
private static final String PROJECT = "project";
@@ -147,17 +126,10 @@ public String apply(SubscriptionId subscriptionId) {
subscriptionId.getSubscription());
}
};
- private static final MessageProcessor DO_NOTHING = new MessageProcessor() {
- @Override
- public void process(Message message) throws Exception {
- // do nothing
- }
- };
private PubSubOptions options;
private PubSubRpcFactory rpcFactoryMock;
private PubSubRpc pubsubRpcMock;
- private AckDeadlineRenewer renewerMock;
private PubSub pubsub;
@Rule
@@ -168,18 +140,17 @@ public void process(Message message) throws Exception {
public void setUp() {
rpcFactoryMock = EasyMock.createStrictMock(PubSubRpcFactory.class);
pubsubRpcMock = EasyMock.createStrictMock(PubSubRpc.class);
- renewerMock = EasyMock.createStrictMock(AckDeadlineRenewer.class);
options = EasyMock.createMock(PubSubOptions.class);
EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();
EasyMock.expect(options.getRpc()).andReturn(pubsubRpcMock).anyTimes();
EasyMock.expect(options.getRetryParams()).andReturn(RetryParams.noRetries()).anyTimes();
- EasyMock.replay(rpcFactoryMock, pubsubRpcMock, renewerMock, options);
- EasyMock.reset(pubsubRpcMock, renewerMock);
+ EasyMock.replay(rpcFactoryMock, pubsubRpcMock, options);
+ EasyMock.reset(pubsubRpcMock);
}
@After
public void tearDown() {
- EasyMock.verify(rpcFactoryMock, pubsubRpcMock, renewerMock, options);
+ EasyMock.verify(rpcFactoryMock, pubsubRpcMock, options);
}
private void resetOptionsForList(int pageCount) {
@@ -192,8 +163,8 @@ private void resetOptionsForList(int pageCount) {
@Test
public void testGetOptions() {
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertSame(options, pubsub.getOptions());
}
@@ -202,8 +173,8 @@ public void testCreateTopic() {
com.google.pubsub.v1.Topic topicPb = TOPIC_INFO.toPb(PROJECT);
Future response = Futures.immediateFuture(topicPb);
EasyMock.expect(pubsubRpcMock.create(topicPb)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
Topic topic = pubsub.create(TOPIC_INFO);
assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topic);
}
@@ -213,8 +184,8 @@ public void testCreateTopicAsync() throws ExecutionException, InterruptedExcepti
com.google.pubsub.v1.Topic topicPb = TOPIC_INFO.toPb(PROJECT);
Future response = Futures.immediateFuture(topicPb);
EasyMock.expect(pubsubRpcMock.create(topicPb)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
Topic topic = pubsub.createAsync(TOPIC_INFO).get();
assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topic);
}
@@ -225,8 +196,8 @@ public void testGetTopic() {
Future response =
Futures.immediateFuture(TOPIC_INFO.toPb(PROJECT));
EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
Topic topic = pubsub.getTopic(TOPIC);
assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topic);
}
@@ -236,8 +207,8 @@ public void testGetTopic_Null() {
GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build();
Future responseFuture = Futures.immediateFuture(null);
EasyMock.expect(pubsubRpcMock.get(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertNull(pubsub.getTopic(TOPIC));
}
@@ -247,8 +218,8 @@ public void testGetTopicAsync() throws ExecutionException, InterruptedException
Future response =
Futures.immediateFuture(TOPIC_INFO.toPb(PROJECT));
EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
Future topicFuture = pubsub.getTopicAsync(TOPIC);
assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topicFuture.get());
}
@@ -258,8 +229,8 @@ public void testGetTopicAsync_Null() throws ExecutionException, InterruptedExcep
GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build();
Future responseFuture = Futures.immediateFuture(null);
EasyMock.expect(pubsubRpcMock.get(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertNull(pubsub.getTopicAsync(TOPIC).get());
}
@@ -268,8 +239,8 @@ public void testDeleteTopic() {
DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build();
Future response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertTrue(pubsub.deleteTopic(TOPIC));
}
@@ -278,8 +249,8 @@ public void testDeleteTopic_Null() {
DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build();
Future response = Futures.immediateFuture(null);
EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertFalse(pubsub.deleteTopic(TOPIC));
}
@@ -288,8 +259,8 @@ public void testDeleteTopicAsync() throws ExecutionException, InterruptedExcepti
DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build();
Future response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertTrue(pubsub.deleteTopicAsync(TOPIC).get());
}
@@ -298,15 +269,15 @@ public void testDeleteTopicAsync_Null() throws ExecutionException, InterruptedEx
DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build();
Future response = Futures.immediateFuture(null);
EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertFalse(pubsub.deleteTopicAsync(TOPIC).get());
}
@Test
public void testListTopics() {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build();
List topicList = ImmutableList.of(
@@ -318,7 +289,7 @@ public void testListTopics() {
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page = pubsub.listTopics();
assertEquals(cursor, page.getNextPageCursor());
assertArrayEquals(topicList.toArray(), Iterables.toArray(page.getValues(), Topic.class));
@@ -327,7 +298,7 @@ public void testListTopics() {
@Test
public void testListTopicsNextPage() {
String cursor1 = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(2);
ListTopicsRequest request1 = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build();
ListTopicsRequest request2 = ListTopicsRequest.newBuilder()
@@ -352,7 +323,7 @@ public void testListTopicsNextPage() {
Future futureResponse2 = Futures.immediateFuture(response2);
EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1);
EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page = pubsub.listTopics();
assertEquals(cursor1, page.getNextPageCursor());
assertArrayEquals(topicList1.toArray(), Iterables.toArray(page.getValues(), Topic.class));
@@ -363,7 +334,7 @@ public void testListTopicsNextPage() {
@Test
public void testListTopicsEmpty() {
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build();
List topicList = ImmutableList.of();
@@ -373,7 +344,7 @@ public void testListTopicsEmpty() {
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page = pubsub.listTopics();
assertNull(page.getNextPageCursor());
assertNull(page.getNextPage());
@@ -383,7 +354,7 @@ public void testListTopicsEmpty() {
@Test
public void testListTopicsWithOptions() {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListTopicsRequest request = ListTopicsRequest.newBuilder()
.setProject(PROJECT_PB)
@@ -399,7 +370,7 @@ public void testListTopicsWithOptions() {
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page = pubsub.listTopics(ListOption.pageSize(42), ListOption.pageToken(cursor));
assertNull(page.getNextPageCursor());
assertNull(page.getNextPage());
@@ -409,7 +380,7 @@ public void testListTopicsWithOptions() {
@Test
public void testListTopicsAsync() throws ExecutionException, InterruptedException {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build();
List topicList = ImmutableList.of(
@@ -421,7 +392,7 @@ public void testListTopicsAsync() throws ExecutionException, InterruptedExceptio
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
AsyncPage page = pubsub.listTopicsAsync().get();
assertEquals(cursor, page.getNextPageCursor());
assertArrayEquals(topicList.toArray(), Iterables.toArray(page.getValues(), Topic.class));
@@ -430,7 +401,7 @@ public void testListTopicsAsync() throws ExecutionException, InterruptedExceptio
@Test
public void testListTopicsAsyncNextPage() throws ExecutionException, InterruptedException {
String cursor1 = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(2);
ListTopicsRequest request1 = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build();
ListTopicsRequest request2 = ListTopicsRequest.newBuilder()
@@ -455,7 +426,7 @@ public void testListTopicsAsyncNextPage() throws ExecutionException, Interrupted
Future futureResponse2 = Futures.immediateFuture(response2);
EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1);
EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
AsyncPage page = pubsub.listTopicsAsync().get();
assertEquals(cursor1, page.getNextPageCursor());
assertArrayEquals(topicList1.toArray(), Iterables.toArray(page.getValues(), Topic.class));
@@ -466,7 +437,7 @@ public void testListTopicsAsyncNextPage() throws ExecutionException, Interrupted
@Test
public void testListTopicsAsyncEmpty() throws ExecutionException, InterruptedException {
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build();
List topicList = ImmutableList.of();
@@ -476,7 +447,7 @@ public void testListTopicsAsyncEmpty() throws ExecutionException, InterruptedExc
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
AsyncPage page = pubsub.listTopicsAsync().get();
assertNull(page.getNextPageCursor());
assertNull(page.getNextPageAsync().get());
@@ -487,7 +458,7 @@ public void testListTopicsAsyncEmpty() throws ExecutionException, InterruptedExc
@Test
public void testListTopicsAsyncWithOptions() throws ExecutionException, InterruptedException {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListTopicsRequest request = ListTopicsRequest.newBuilder()
.setProject(PROJECT_PB)
@@ -503,7 +474,7 @@ public void testListTopicsAsyncWithOptions() throws ExecutionException, Interrup
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
AsyncPage page =
pubsub.listTopicsAsync(ListOption.pageSize(42), ListOption.pageToken(cursor)).get();
assertNull(page.getNextPageCursor());
@@ -521,8 +492,8 @@ public void testPublishOneMessage() {
PublishResponse response = PublishResponse.newBuilder().addMessageIds(messageId).build();
Future responseFuture = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertEquals(messageId, pubsub.publish(TOPIC, MESSAGE));
}
@@ -536,8 +507,8 @@ public void testPublishOneMessageAsync() throws ExecutionException, InterruptedE
PublishResponse response = PublishResponse.newBuilder().addMessageIds(messageId).build();
Future responseFuture = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertEquals(messageId, pubsub.publishAsync(TOPIC, MESSAGE).get());
}
@@ -553,8 +524,8 @@ public void testPublishMoreMessages() {
.build();
Future responseFuture = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertEquals(messageIds, pubsub.publish(TOPIC, MESSAGE, MESSAGE));
}
@@ -570,8 +541,8 @@ public void testPublishMoreMessagesAsync() throws ExecutionException, Interrupte
.build();
Future responseFuture = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertEquals(messageIds, pubsub.publishAsync(TOPIC, MESSAGE, MESSAGE).get());
}
@@ -587,8 +558,8 @@ public void testPublishMessageList() {
.build();
Future responseFuture = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertEquals(messageIds, pubsub.publish(TOPIC, ImmutableList.of(MESSAGE, MESSAGE)));
}
@@ -604,8 +575,8 @@ public void testPublishMessageListAsync() throws ExecutionException, Interrupted
.build();
Future responseFuture = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertEquals(messageIds, pubsub.publishAsync(TOPIC, ImmutableList.of(MESSAGE, MESSAGE)).get());
}
@@ -615,8 +586,8 @@ public void testCreateSubscription() {
Future response =
Futures.immediateFuture(subscriptionPb);
EasyMock.expect(pubsubRpcMock.create(subscriptionPb)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
Subscription subscription = pubsub.create(SUBSCRIPTION_INFO);
assertEquals(
new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)),
@@ -629,8 +600,8 @@ public void testCreateSubscriptionAsync() throws ExecutionException, Interrupted
Future response =
Futures.immediateFuture(subscriptionPb);
EasyMock.expect(pubsubRpcMock.create(subscriptionPb)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
Subscription subscription = pubsub.createAsync(SUBSCRIPTION_INFO).get();
assertEquals(
new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)),
@@ -644,8 +615,8 @@ public void testGetSubscription() {
Future response =
Futures.immediateFuture(SUBSCRIPTION_INFO.toPb(PROJECT));
EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
Subscription subscription = pubsub.getSubscription(SUBSCRIPTION);
assertEquals(
new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)),
@@ -658,8 +629,8 @@ public void testGetSubscription_Null() {
GetSubscriptionRequest.newBuilder().setSubscription(SUBSCRIPTION_NAME_PB).build();
Future response = Futures.immediateFuture(null);
EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertNull(pubsub.getSubscription(SUBSCRIPTION));
}
@@ -670,8 +641,8 @@ public void testGetSubscriptionAsync() throws ExecutionException, InterruptedExc
Future response =
Futures.immediateFuture(SUBSCRIPTION_INFO.toPb(PROJECT));
EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
Subscription subscription = pubsub.getSubscriptionAsync(SUBSCRIPTION).get();
assertEquals(
new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)),
@@ -684,8 +655,8 @@ public void testGetSubscriptionAsync_Null() throws ExecutionException, Interrupt
GetSubscriptionRequest.newBuilder().setSubscription(SUBSCRIPTION_NAME_PB).build();
Future response = Futures.immediateFuture(null);
EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertNull(pubsub.getSubscriptionAsync(SUBSCRIPTION).get());
}
@@ -696,8 +667,8 @@ public void testDeleteSubscription() {
.build();
Future response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertTrue(pubsub.deleteSubscription(SUBSCRIPTION));
}
@@ -708,8 +679,8 @@ public void testDeleteSubscription_Null() {
.build();
Future response = Futures.immediateFuture(null);
EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertFalse(pubsub.deleteSubscription(SUBSCRIPTION));
}
@@ -720,8 +691,8 @@ public void testDeleteSubscriptionAsync() throws ExecutionException, Interrupted
.build();
Future response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertTrue(pubsub.deleteSubscriptionAsync(SUBSCRIPTION).get());
}
@@ -732,8 +703,8 @@ public void testDeleteSubscriptionAsync_Null() throws ExecutionException, Interr
.build();
Future response = Futures.immediateFuture(null);
EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
assertFalse(pubsub.deleteSubscriptionAsync(SUBSCRIPTION).get());
}
@@ -745,8 +716,8 @@ public void testReplacePushConfig() {
.build();
Future response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
pubsub.replacePushConfig(SUBSCRIPTION, PUSH_CONFIG);
}
@@ -758,8 +729,8 @@ public void testReplacePushConfig_Null() {
.build();
Future response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
pubsub.replacePushConfig(SUBSCRIPTION, null);
}
@@ -771,8 +742,8 @@ public void testReplacePushConfigAsync() throws ExecutionException, InterruptedE
.build();
Future response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
pubsub.replacePushConfigAsync(SUBSCRIPTION, PUSH_CONFIG).get();
}
@@ -784,15 +755,15 @@ public void testReplacePushConfigAsync_Null() throws ExecutionException, Interru
.build();
Future response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
- EasyMock.replay(pubsubRpcMock, renewerMock);
- pubsub = new PubSubImpl(options, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
+ pubsub = new PubSubImpl(options);
pubsub.replacePushConfigAsync(SUBSCRIPTION, null).get();
}
@Test
public void testListSubscriptions() {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder()
.setProject(PROJECT_PB)
@@ -806,7 +777,7 @@ public void testListSubscriptions() {
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page = pubsub.listSubscriptions();
assertEquals(cursor, page.getNextPageCursor());
assertArrayEquals(subscriptionList.toArray(),
@@ -816,7 +787,7 @@ public void testListSubscriptions() {
@Test
public void testListSubscriptionsNextPage() {
String cursor1 = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(2);
ListSubscriptionsRequest request1 = ListSubscriptionsRequest.newBuilder()
.setProject(PROJECT_PB)
@@ -843,7 +814,7 @@ public void testListSubscriptionsNextPage() {
Future futureResponse2 = Futures.immediateFuture(response2);
EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1);
EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page = pubsub.listSubscriptions();
assertEquals(cursor1, page.getNextPageCursor());
assertArrayEquals(subscriptionList1.toArray(),
@@ -856,7 +827,7 @@ public void testListSubscriptionsNextPage() {
@Test
public void testListSubscriptionsEmpty() {
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder()
.setProject(PROJECT_PB)
@@ -868,7 +839,7 @@ public void testListSubscriptionsEmpty() {
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page = pubsub.listSubscriptions();
assertNull(page.getNextPageCursor());
assertNull(page.getNextPage());
@@ -879,7 +850,7 @@ public void testListSubscriptionsEmpty() {
@Test
public void testListSubscriptionsWithOptions() {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder()
.setProject(PROJECT_PB)
@@ -895,7 +866,7 @@ public void testListSubscriptionsWithOptions() {
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page =
pubsub.listSubscriptions(ListOption.pageSize(42), ListOption.pageToken(cursor));
assertNull(page.getNextPageCursor());
@@ -907,7 +878,7 @@ public void testListSubscriptionsWithOptions() {
@Test
public void testListSubscriptionsAsync() throws ExecutionException, InterruptedException {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder()
.setProject(PROJECT_PB)
@@ -921,7 +892,7 @@ public void testListSubscriptionsAsync() throws ExecutionException, InterruptedE
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
AsyncPage page = pubsub.listSubscriptionsAsync().get();
assertEquals(cursor, page.getNextPageCursor());
assertArrayEquals(subscriptionList.toArray(),
@@ -931,7 +902,7 @@ public void testListSubscriptionsAsync() throws ExecutionException, InterruptedE
@Test
public void testListSubscriptionsAsyncNextPage() throws ExecutionException, InterruptedException {
String cursor1 = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(2);
ListSubscriptionsRequest request1 = ListSubscriptionsRequest.newBuilder()
.setProject(PROJECT_PB)
@@ -958,7 +929,7 @@ public void testListSubscriptionsAsyncNextPage() throws ExecutionException, Inte
Future futureResponse2 = Futures.immediateFuture(response2);
EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1);
EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
AsyncPage page = pubsub.listSubscriptionsAsync().get();
assertEquals(cursor1, page.getNextPageCursor());
assertArrayEquals(subscriptionList1.toArray(),
@@ -971,7 +942,7 @@ public void testListSubscriptionsAsyncNextPage() throws ExecutionException, Inte
@Test
public void testListSubscriptionsAsyncEmpty() throws ExecutionException, InterruptedException {
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder()
.setProject(PROJECT_PB)
@@ -983,7 +954,7 @@ public void testListSubscriptionsAsyncEmpty() throws ExecutionException, Interru
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
AsyncPage page = pubsub.listSubscriptionsAsync().get();
assertNull(page.getNextPageCursor());
assertNull(page.getNextPageAsync().get());
@@ -996,7 +967,7 @@ public void testListSubscriptionsAsyncEmpty() throws ExecutionException, Interru
public void testListSubscriptionsAsyncWithOptions()
throws ExecutionException, InterruptedException {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
resetOptionsForList(1);
ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder()
.setProject(PROJECT_PB)
@@ -1012,7 +983,7 @@ public void testListSubscriptionsAsyncWithOptions()
.build();
Future futureResponse = Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
AsyncPage page =
pubsub.listSubscriptionsAsync(ListOption.pageSize(42), ListOption.pageToken(cursor)).get();
assertNull(page.getNextPageCursor());
@@ -1025,7 +996,7 @@ public void testListSubscriptionsAsyncWithOptions()
@Test
public void testListTopicSubscriptions() {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder()
.setTopic(TOPIC_NAME_PB)
.build();
@@ -1039,7 +1010,7 @@ public void testListTopicSubscriptions() {
Future futureResponse =
Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page = pubsub.listSubscriptions(TOPIC);
assertEquals(cursor, page.getNextPageCursor());
assertArrayEquals(subscriptionList.toArray(),
@@ -1049,7 +1020,7 @@ public void testListTopicSubscriptions() {
@Test
public void testListTopicSubscriptionsNextPage() {
String cursor1 = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
ListTopicSubscriptionsRequest request1 = ListTopicSubscriptionsRequest.newBuilder()
.setTopic(TOPIC_NAME_PB)
.build();
@@ -1077,7 +1048,7 @@ public void testListTopicSubscriptionsNextPage() {
Futures.immediateFuture(response2);
EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1);
EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page = pubsub.listSubscriptions(TOPIC);
assertEquals(cursor1, page.getNextPageCursor());
assertArrayEquals(subscriptionList1.toArray(),
@@ -1090,7 +1061,7 @@ public void testListTopicSubscriptionsNextPage() {
@Test
public void testListTopicSubscriptionsEmpty() {
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder()
.setTopic(TOPIC_NAME_PB)
.build();
@@ -1102,7 +1073,7 @@ public void testListTopicSubscriptionsEmpty() {
Future futureResponse =
Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page = pubsub.listSubscriptions(TOPIC);
assertNull(page.getNextPageCursor());
assertNull(page.getNextPage());
@@ -1113,7 +1084,7 @@ public void testListTopicSubscriptionsEmpty() {
@Test
public void testListTopicSubscriptionsWithOptions() {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder()
.setTopic(TOPIC_NAME_PB)
.setPageSize(42)
@@ -1129,7 +1100,7 @@ public void testListTopicSubscriptionsWithOptions() {
Future futureResponse =
Futures.immediateFuture(response);
EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse);
- EasyMock.replay(pubsubRpcMock, renewerMock);
+ EasyMock.replay(pubsubRpcMock);
Page page =
pubsub.listSubscriptions(TOPIC, ListOption.pageSize(42), ListOption.pageToken(cursor));
assertNull(page.getNextPageCursor());
@@ -1141,7 +1112,7 @@ public void testListTopicSubscriptionsWithOptions() {
@Test
public void testListTopicSubscriptionsAsync() throws ExecutionException, InterruptedException {
String cursor = "cursor";
- pubsub = new PubSubImpl(options, renewerMock);
+ pubsub = new PubSubImpl(options);
ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder()
.setTopic(TOPIC_NAME_PB)
.build();
@@ -1155,7 +1126,7 @@ public void testListTopicSubscriptionsAsync() throws ExecutionException, Interru
Future