Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

googlecloud: Replace context for Publisher with configurable timeouts #83

Merged
merged 2 commits into from
Jun 1, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions docs/content/docs/getting-started/googlecloud/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"context"
"log"

"github.com/ThreeDotsLabs/watermill/message/infrastructure/googlecloud"

"github.com/ThreeDotsLabs/watermill"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/googlecloud"
)

func main() {
Expand Down Expand Up @@ -37,7 +35,7 @@ func main() {

go process(messages)

publisher, err := googlecloud.NewPublisher(context.Background(), googlecloud.PublisherConfig{
publisher, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{
ProjectID: "test-project",
})
if err != nil {
Expand Down
28 changes: 20 additions & 8 deletions message/infrastructure/googlecloud/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package googlecloud
import (
"context"
"sync"

"github.com/ThreeDotsLabs/watermill"
"time"

"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"google.golang.org/api/option"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)

Expand All @@ -21,8 +21,6 @@ var (
)

type Publisher struct {
ctx context.Context

topics map[string]*pubsub.Topic
topicsLock sync.RWMutex
closed bool
Expand All @@ -39,6 +37,11 @@ type PublisherConfig struct {
// Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool

// ConnectTimeout defines the timeout for connecting to Pub/Sub
ConnectTimeout time.Duration
// PublishTimeout defines the timeout for publishing messages.
PublishTimeout time.Duration

// Settings for cloud.google.com/go/pubsub client library.
PublishSettings *pubsub.PublishSettings
ClientOptions []option.ClientOption
Expand All @@ -52,22 +55,30 @@ func (c *PublisherConfig) setDefaults() {
if c.Marshaler == nil {
c.Marshaler = DefaultMarshalerUnmarshaler{}
}

if c.ConnectTimeout == 0 {
c.ConnectTimeout = time.Second * 10
}
if c.PublishTimeout == 0 {
c.PublishTimeout = time.Second * 5
}
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}

func NewPublisher(ctx context.Context, config PublisherConfig) (*Publisher, error) {
func NewPublisher(config PublisherConfig) (*Publisher, error) {
config.setDefaults()

pub := &Publisher{
ctx: ctx,
topics: map[string]*pubsub.Topic{},
config: config,
}

var err error

ctx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout)
defer cancel()

pub.client, err = pubsub.NewClient(ctx, config.ProjectID, config.ClientOptions...)
if err != nil {
return nil, err
Expand All @@ -88,7 +99,8 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
return ErrPublisherClosed
}

ctx := p.ctx
ctx, cancel := context.WithTimeout(context.Background(), p.config.PublishTimeout)
defer cancel()

t, err := p.topic(ctx, topic)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions message/infrastructure/googlecloud/pubsub_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (

func BenchmarkSubscriber(b *testing.B) {
infrastructure.BenchSubscriber(b, func(n int) message.PubSub {
ctx := context.Background()
logger := watermill.NopLogger{}

publisher, err := googlecloud.NewPublisher(ctx, googlecloud.PublisherConfig{})
publisher, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{})
m110 marked this conversation as resolved.
Show resolved Hide resolved

ctx := context.Background()
subscriber, err := googlecloud.NewSubscriber(
ctx,
googlecloud.SubscriberConfig{},
Expand Down
9 changes: 4 additions & 5 deletions message/infrastructure/googlecloud/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ import (
func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscriptionName googlecloud.SubscriptionNameFn) message.PubSub {
logger := watermill.NewStdLogger(true, true)

ctx := context.Background()
publisher, err := googlecloud.NewPublisher(
ctx,
googlecloud.PublisherConfig{
Marshaler: marshaler,
Logger: logger,
},
)
require.NoError(t, err)

ctx := context.Background()
subscriber, err := googlecloud.NewSubscriber(
ctx,
googlecloud.SubscriberConfig{
Expand Down Expand Up @@ -112,7 +111,7 @@ func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) {
}
}()

produceMessages(t, ctx, topic1, howManyMessages)
produceMessages(t, topic1, howManyMessages)

select {
case <-allMessagesReceived:
Expand All @@ -125,8 +124,8 @@ func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) {
require.Equal(t, googlecloud.ErrUnexpectedTopic, errors.Cause(err))
}

func produceMessages(t *testing.T, ctx context.Context, topic string, howMany int) {
pub, err := googlecloud.NewPublisher(ctx, googlecloud.PublisherConfig{})
func produceMessages(t *testing.T, topic string, howMany int) {
pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{})
require.NoError(t, err)
defer pub.Close()

Expand Down
14 changes: 10 additions & 4 deletions message/infrastructure/googlecloud/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"fmt"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"time"

"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
Expand Down Expand Up @@ -65,6 +65,9 @@ type SubscriberConfig struct {
// Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool

// InitializeTimeout defines the timeout for initializing topics.
InitializeTimeout time.Duration

// Settings for cloud.google.com/go/pubsub client library.
ReceiveSettings pubsub.ReceiveSettings
SubscriptionConfig pubsub.SubscriptionConfig
Expand Down Expand Up @@ -93,6 +96,9 @@ func (c *SubscriberConfig) setDefaults() {
if c.GenerateSubscriptionName == nil {
c.GenerateSubscriptionName = TopicSubscriptionName
}
if c.InitializeTimeout == 0 {
c.InitializeTimeout = time.Second * 10
}
if c.Unmarshaler == nil {
c.Unmarshaler = DefaultMarshalerUnmarshaler{}
}
Expand Down Expand Up @@ -184,7 +190,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
}

func (s *Subscriber) SubscribeInitialize(topic string) (err error) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), s.config.InitializeTimeout)
defer cancel()

subscriptionName := s.config.GenerateSubscriptionName(topic)
Expand Down