Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

delete pull-related methods from PubSub #1487

Merged
merged 7 commits into from
Dec 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import com.google.cloud.Role;
import com.google.cloud.pubsub.Message;
import com.google.cloud.pubsub.PubSub;
import com.google.cloud.pubsub.PubSub.MessageProcessor;
import com.google.cloud.pubsub.PubSubOptions;
import com.google.cloud.pubsub.PushConfig;
import com.google.cloud.pubsub.ReceivedMessage;
import com.google.cloud.pubsub.Subscription;
import com.google.cloud.pubsub.SubscriptionId;
import com.google.cloud.pubsub.SubscriptionInfo;
Expand Down Expand Up @@ -453,192 +451,6 @@ public String params() {
}
}

/**
* This class demonstrates how to acknowledge Pub/Sub messages for a subscription.
*
* @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Receiving
* pull messages</a>
*/
private static class AckMessagesAction extends MessagesAction {
@Override
public void run(PubSub pubsub, Tuple<String, List<String>> params) {
String subscription = params.x();
List<String> ackIds = params.y();
pubsub.ack(subscription, ackIds);
System.out.printf("Acked %d messages for subscription %s%n", ackIds.size(), subscription);
}
}

/**
* This class demonstrates how to "nack" Pub/Sub messages for a subscription. This action
* corresponds to setting the acknowledge deadline to 0.
*
* @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Message
* acknowledgement deadline</a>
*/
private static class NackMessagesAction extends MessagesAction {
@Override
public void run(PubSub pubsub, Tuple<String, List<String>> params) {
String subscription = params.x();
List<String> ackIds = params.y();
pubsub.nack(subscription, ackIds);
System.out.printf("Nacked %d messages for subscription %s%n", ackIds.size(), subscription);
}
}

/**
* This class demonstrates how modify the acknowledge deadline for messages in a Pub/Sub
* subscription.
*
* @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Message
* acknowledgement deadline</a>
*/
private static class ModifyAckDeadlineAction
extends PubSubAction<Tuple<ModifyAckDeadlineAction.SubscriptionAndDeadline, List<String>>> {

static class SubscriptionAndDeadline {

private final String subscription;
private final int deadlineMillis;

private SubscriptionAndDeadline(String subscription, int deadlineMillis) {
this.subscription = subscription;
this.deadlineMillis = deadlineMillis;
}

String subscription() {
return subscription;
}

int deadlineMillis() {
return deadlineMillis;
}
}

@Override
public void run(PubSub pubsub, Tuple<SubscriptionAndDeadline, List<String>> params)
throws Exception {
String subscription = params.x().subscription();
int deadline = params.x().deadlineMillis();
List<String> ackIds = params.y();
pubsub.modifyAckDeadline(subscription, deadline, TimeUnit.MILLISECONDS, ackIds);
System.out.printf("Ack deadline set to %d for %d messages in subscription %s%n", deadline,
ackIds.size(), subscription);
}

@Override
Tuple<SubscriptionAndDeadline, List<String>> parse(String... args) throws Exception {
if (args.length < 3) {
throw new IllegalArgumentException("Missing required subscription, deadline and ack IDs");
}
String subscription = args[0];
int deadline = Integer.parseInt(args[1]);
return Tuple.of(new SubscriptionAndDeadline(subscription, deadline),
Arrays.asList(Arrays.copyOfRange(args, 2, args.length)));
}

@Override
public String params() {
return "<subscription> <deadlineMillis> <ackId>+";
}
}

/**
* This class demonstrates how to asynchronously pull messages from a Pub/Sub pull subscription.
* Messages are pulled until a timeout is reached.
*
* @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Receiving
* pull messages</a>
*/
private static class PullAsyncAction extends PubSubAction<Tuple<String, Long>> {
@Override
public void run(PubSub pubsub, Tuple<String, Long> params) throws Exception {
String subscription = params.x();
Long timeout = params.y();
final AtomicInteger messageCount = new AtomicInteger();
MessageProcessor messageProcessor = new MessageProcessor() {

@Override
public void process(Message message) throws Exception {
System.out.printf("Received message \"%s\"%n", message);
messageCount.incrementAndGet();
}
};
try (PubSub.MessageConsumer consumer = pubsub.pullAsync(subscription, messageProcessor)) {
Thread.sleep(timeout);
}
System.out.printf("Pulled %d messages from subscription %s%n", messageCount.get(),
subscription);
}

@Override
Tuple<String, Long> parse(String... args) throws Exception {
String message;
if (args.length > 2) {
message = "Too many arguments.";
} else if (args.length < 1) {
message = "Missing required subscription name";
} else {
String subscription = args[0];
long timeout = 60_000;
if (args.length == 2) {
timeout = Long.parseLong(args[1]);
}
return Tuple.of(subscription, timeout);
}
throw new IllegalArgumentException(message);
}

@Override
public String params() {
return "<subscription> <timeoutMillis>?";
}
}

/**
* This class demonstrates how to synchronously pull messages from a Pub/Sub pull subscription.
* No more than the requested number of messages are pulled. Possibly less messages are pulled.
*
* @see <a href="https://cloud.google.com/pubsub/subscriber#receiving-pull-messages">Receiving
* pull messages</a>
*/
private static class PullSyncAction extends PubSubAction<Tuple<String, Integer>> {
@Override
public void run(PubSub pubsub, Tuple<String, Integer> params) throws Exception {
String subscription = params.x();
Integer maxMessages = params.y();
Iterator<ReceivedMessage> messageIterator = pubsub.pull(subscription, maxMessages);
int messageCount = 0;
while (messageIterator.hasNext()) {
ReceivedMessage message = messageIterator.next();
System.out.printf("Received message \"%s\"%n", message);
message.ack();
messageCount++;
}
System.out.printf("Pulled %d messages from subscription %s%n", messageCount, subscription);
}

@Override
Tuple<String, Integer> parse(String... args) throws Exception {
String message;
if (args.length == 2) {
String subscription = args[0];
int maxMessages = Integer.parseInt(args[1]);
return Tuple.of(subscription, maxMessages);
} else if (args.length > 2) {
message = "Too many arguments.";
} else {
message = "Missing required subscription name";
}
throw new IllegalArgumentException(message);
}

@Override
public String params() {
return "<subscription> <maxMessages>";
}
}

private abstract static class GetPolicyAction extends PubSubAction<String> {
@Override
String parse(String... args) throws Exception {
Expand Down Expand Up @@ -817,8 +629,6 @@ public void run(PubSub pubsub, Tuple<String, List<String>> param) throws Excepti
LIST_ACTIONS.put("subscriptions", new ListSubscriptionsAction());
DELETE_ACTIONS.put("topic", new DeleteTopicAction());
DELETE_ACTIONS.put("subscription", new DeleteSubscriptionAction());
PULL_ACTIONS.put("async", new PullAsyncAction());
PULL_ACTIONS.put("sync", new PullSyncAction());
GET_IAM_ACTIONS.put("topic", new GetTopicPolicyAction());
GET_IAM_ACTIONS.put("subscription", new GetSubscriptionPolicyAction());
REPLACE_IAM_ACTIONS.put("topic", new AddIdentityTopicAction());
Expand All @@ -835,9 +645,6 @@ public void run(PubSub pubsub, Tuple<String, List<String>> param) throws Excepti
ACTIONS.put("test-permissions", new ParentAction(TEST_IAM_ACTIONS));
ACTIONS.put("publish", new PublishMessagesAction());
ACTIONS.put("replace-push-config", new ReplacePushConfigAction());
ACTIONS.put("ack", new AckMessagesAction());
ACTIONS.put("nack", new NackMessagesAction());
ACTIONS.put("modify-ack-deadline", new ModifyAckDeadlineAction());
}

private static void printUsage() {
Expand Down

This file was deleted.

Loading