Skip to content

Commit

Permalink
Merge pull request #1593 from pongad/merge
Browse files Browse the repository at this point in the history
merge pubsub-hp into master
  • Loading branch information
pongad authored Feb 7, 2017
2 parents 8f4828f + 8cf7a2b commit c230c0f
Show file tree
Hide file tree
Showing 29 changed files with 4,780 additions and 442 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package com.google.cloud.examples.pubsub.snippets;

import com.google.cloud.pubsub.Message;
import com.google.cloud.pubsub.PubSub;
import com.google.cloud.pubsub.PubSub.MessageConsumer;
import com.google.cloud.pubsub.PubSub.MessageProcessor;
import com.google.cloud.pubsub.PubSubOptions;
import com.google.cloud.pubsub.Subscription;
import com.google.cloud.pubsub.SubscriptionInfo;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.Subscriber;
import com.google.cloud.pubsub.spi.v1.SubscriberClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;

/**
* A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull subscription and
Expand All @@ -31,18 +33,38 @@
public class CreateSubscriptionAndPullMessages {

public static void main(String... args) throws Exception {
try (PubSub pubsub = PubSubOptions.getDefaultInstance().getService()) {
Subscription subscription =
pubsub.create(SubscriptionInfo.of("test-topic", "test-subscription"));
MessageProcessor callback = new MessageProcessor() {
@Override
public void process(Message message) throws Exception {
System.out.printf("Received message \"%s\"%n", message.getPayloadAsString());
}
};
// Create a message consumer and pull messages (for 60 seconds)
try (MessageConsumer consumer = subscription.pullAsync(callback)) {
Thread.sleep(60_000);
TopicName topic = TopicName.create("test-project", "test-topic");
SubscriptionName subscription = SubscriptionName.create("test-project", "test-subscription");

try (SubscriberClient subscriberClient = SubscriberClient.create()) {
subscriberClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);
}

MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(
PubsubMessage message, SettableFuture<MessageReceiver.AckReply> response) {
System.out.println("got message: " + message.getData().toStringUtf8());
response.set(MessageReceiver.AckReply.ACK);
}
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscription, receiver).build();
subscriber.addListener(
new Subscriber.SubscriberListener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
System.err.println(failure);
}
},
MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
Thread.sleep(60000);
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,46 @@

package com.google.cloud.examples.pubsub.snippets;

import com.google.cloud.pubsub.Message;
import com.google.cloud.pubsub.PubSub;
import com.google.cloud.pubsub.PubSubOptions;
import com.google.cloud.pubsub.Topic;
import com.google.cloud.pubsub.TopicInfo;
import com.google.cloud.pubsub.spi.v1.Publisher;
import com.google.cloud.pubsub.spi.v1.PublisherClient;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub topic and asynchronously
* publish messages to it.
*/
public class CreateTopicAndPublishMessages {

public static void main(String... args) throws Exception {
try (PubSub pubsub = PubSubOptions.getDefaultInstance().getService()) {
Topic topic = pubsub.create(TopicInfo.of("test-topic"));
Message message1 = Message.of("First message");
Message message2 = Message.of("Second message");
topic.publishAsync(message1, message2);
TopicName topic = TopicName.create("test-project", "test-topic");
try (PublisherClient publisherClient = PublisherClient.create()) {
publisherClient.createTopic(topic);
}

Publisher publisher = null;
try {
publisher = Publisher.newBuilder(topic).build();
List<String> messages = Arrays.asList("first message", "second message");
List<ListenableFuture<String>> messageIds = new ArrayList<>();
for (String message : messages) {
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ListenableFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
messageIds.add(messageIdFuture);
}
for (String messageId : Futures.allAsList(messageIds).get()) {
System.out.println("published with message ID: " + messageId);
}
} finally {
if (publisher != null) {
publisher.shutdown();
}
}
}
}
11 changes: 7 additions & 4 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@
<version>3.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down Expand Up @@ -121,10 +127,7 @@
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<!-- Downgrading to 3.1 because of https://issues.apache.org/jira/browse/MCOMPILER-236 -->
<!-- Upgrade to 3.5.1 which fixes the problem when available -->
<!-- <version>3.5.1</version> -->
<version>3.1</version>
<version>3.5.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2017 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.spi.v1;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicInteger;

/** Provides a simplistic round robin, guarding for overflow. */
class AtomicRoundRobin {
private final int max;
private final AtomicInteger current;

AtomicRoundRobin(int max) {
Preconditions.checkArgument(max > 0);
this.max = max;
current = new AtomicInteger(0);
}

int next() {
int next = current.getAndIncrement() % max;
if (next < 0) {
next += max;
}
return next;
}

@VisibleForTesting
void set(int i) {
current.set(i);
}
}
Loading

0 comments on commit c230c0f

Please # to comment.