Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add base class for operation options, javadoc and tests #996

Merged
merged 2 commits into from
May 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;

import java.io.Serializable;
import java.util.Objects;

/**
* Base class for Pub/Sub operation options.
*/
abstract class Option implements Serializable {

private static final long serialVersionUID = 4956295408130172192L;

private final OptionType optionType;
private final Object value;

interface OptionType {

String name();
}

Option(OptionType optionType, Object value) {
this.optionType = checkNotNull(optionType);
this.value = value;
}

@SuppressWarnings("unchecked")
<T extends OptionType> T optionType() {
return (T) optionType;
}

Object value() {
return value;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Option)) {
return false;
}
Option other = (Option) obj;
return Objects.equals(optionType, other.optionType)
&& Objects.equals(value, other.value);
}

@Override
public int hashCode() {
return Objects.hash(optionType, value);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", optionType.name())
.add("value", value)
.toString();
}
}
122 changes: 52 additions & 70 deletions gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.google.cloud.Page;
import com.google.cloud.Service;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand All @@ -33,65 +33,79 @@
*/
public interface PubSub extends Service<PubSubOptions> {

final class ListOption implements Serializable {
/**
* Class for specifying options for listing topics and subscriptions.
*/
final class ListOption extends Option {

private static final long serialVersionUID = 6517442127283383124L;

private final Option option;
private final Object value;
enum OptionType implements Option.OptionType {
PAGE_SIZE, PAGE_TOKEN;

enum Option {
PAGE_SIZE, PAGE_TOKEN
}
@SuppressWarnings("unchecked")
<T> T get(Map<Option.OptionType, ?> options) {
return (T) options.get(this);
}

private ListOption(Option option, Object value) {
this.option = option;
this.value = value;
}
String getString(Map<Option.OptionType, ?> options) {
return get(options);
}

Option option() {
return option;
Integer getInteger(Map<Option.OptionType, ?> options) {
return get(options);
}
}

Object value() {
return value;
private ListOption(OptionType option, Object value) {
super(option, value);
}

/**
* Returns an option to specify the maximum number of resources returned per page.
*/
public static ListOption pageSize(int pageSize) {
return new ListOption(Option.PAGE_SIZE, pageSize);
return new ListOption(OptionType.PAGE_SIZE, pageSize);
}

/**
* Returns an option to specify the page token from which to start listing resources.
*/
public static ListOption pageToken(String pageToken) {
return new ListOption(Option.PAGE_TOKEN, pageToken);
return new ListOption(OptionType.PAGE_TOKEN, pageToken);
}
}

final class PullOption implements Serializable {

private static final long serialVersionUID = -5220474819637439937L;
/**
* Class for specifying options for pulling messages.
*/
final class PullOption extends Option {

private final Option option;
private final Object value;
private static final long serialVersionUID = 4792164134340316582L;

enum Option {
MAX_MESSAGES
}
enum OptionType implements Option.OptionType {
MAX_CONCURRENT_CALLBACKS;

private PullOption(Option option, Object value) {
this.option = option;
this.value = value;
}
@SuppressWarnings("unchecked")
<T> T get(Map<Option.OptionType, ?> options) {
return (T) options.get(this);
}

Option option() {
return option;
Integer getInteger(Map<Option.OptionType, ?> options) {
return get(options);
}
}

Object value() {
return value;
private PullOption(Option.OptionType option, Object value) {
super(option, value);
}

public static PullOption maxMessages(int maxMessages) {
return new PullOption(Option.MAX_MESSAGES, maxMessages);
/**
* Returns an option to specify the maximum number of messages that can be executed
* concurrently at any time.
*/
public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
}
}

Expand All @@ -108,38 +122,6 @@ interface MessageProcessor {
*/
interface MessageConsumer extends AutoCloseable {

final class PullOption implements Serializable {

private static final long serialVersionUID = 4792164134340316582L;

private final Option option;
private final Object value;

enum Option {
MAX_CONCURRENT_CALLBACKS
}

private PullOption(Option option, Object value) {
this.option = option;
this.value = value;
}

Option option() {
return option;
}

Object value() {
return value;
}

public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
return new PullOption(Option.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
}
}

void start(MessageConsumer.PullOption... options);

void stop();
}

Topic create(TopicInfo topic);
Expand Down Expand Up @@ -198,11 +180,11 @@ public static PullOption maxConcurrentCallbacks(int maxConcurrency) {

Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic, ListOption... options);

Iterator<ReceivedMessage> pull(String subscription, PullOption... options);
Iterator<ReceivedMessage> pull(String subscription, int maxMessages);

Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options);
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);

MessageConsumer pullAsync(String subscription, MessageProcessor callback);
MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);

void ack(String subscription, String ackId, String... ackIds);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,
}

@Override
public Iterator<ReceivedMessage> pull(String subscription, PullOption... options) {
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
// this should set return_immediately to true
return null;
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options) {
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) {
// though this method can set return_immediately to false (as future can be canceled) I
// suggest to keep it false so sync could delegate to asyc and use the same options
// this method also should use the VTKIT thread-pool to renew ack deadline for non consumed
Expand All @@ -211,7 +211,8 @@ public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOpti
}

@Override
public MessageConsumer pullAsync(String subscription, MessageProcessor callback) {
public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
PullOption... options) {
// this method should use the VTKIT thread-pool (maybe getting it should be part of the spi)
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ public Future<Void> replacePushConfigAsync(PushConfig pushConfig) {
return pubsub.replacePushConfigAsync(name(), pushConfig);
}

public Iterator<ReceivedMessage> pull(PullOption... options) {
return pubsub.pull(name(), options);
public Iterator<ReceivedMessage> pull(int maxMessages) {
return pubsub.pull(name(), maxMessages);
}

public Future<Iterator<ReceivedMessage>> pullAsync(PullOption... options) {
return pubsub.pullAsync(name(), options);
public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
return pubsub.pullAsync(name(), maxMessages);
}

public MessageConsumer pullAsync(MessageProcessor callback) {
return pubsub.pullAsync(name(), callback);
public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
return pubsub.pullAsync(name(), callback, options);
}

private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;

import com.google.cloud.pubsub.Option.OptionType;
import com.google.cloud.pubsub.PubSub.ListOption;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class OptionTest {

private static final OptionType OPTION_TYPE = ListOption.OptionType.PAGE_SIZE;
private static final OptionType ANOTHER_OPTION_TYPE = ListOption.OptionType.PAGE_TOKEN;
private static final String VALUE = "some value";
private static final String OTHER_VALUE = "another value";
private static final Option OPTION = new Option(OPTION_TYPE, VALUE) {};
private static final Option OPTION_EQUALS = new Option(OPTION_TYPE, VALUE) {};
private static final Option OPTION_NOT_EQUALS1 = new Option(ANOTHER_OPTION_TYPE, OTHER_VALUE) {};
private static final Option OPTION_NOT_EQUALS2 = new Option(ANOTHER_OPTION_TYPE, VALUE) {};

@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void testEquals() {
assertEquals(OPTION, OPTION_EQUALS);
assertNotEquals(OPTION, OPTION_NOT_EQUALS1);
assertNotEquals(OPTION, OPTION_NOT_EQUALS2);
}

@Test
public void testHashCode() {
assertEquals(OPTION.hashCode(), OPTION_EQUALS.hashCode());
}

@Test
public void testConstructor() {
assertEquals(OPTION_TYPE, OPTION.optionType());
assertEquals(VALUE, OPTION.value());
Option option = new Option(OPTION_TYPE, null) {};
assertEquals(OPTION_TYPE, option.optionType());
assertNull(option.value());
thrown.expect(NullPointerException.class);
new Option(null, VALUE) {};
}
}
Loading