Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

changed AddNoPublisherHandler signature #82

Merged
merged 3 commits into from
Jun 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions components/cqrs/command_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ func (p CommandProcessor) Handlers() []CommandHandler {
return p.handlers
}

func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) {
func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) {
cmd := handler.NewCommand()
cmdName := p.marshaler.Name(cmd)

if err := p.validateCommand(cmd); err != nil {
return nil, err
}

return func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) error {
cmd := handler.NewCommand()
messageCmdName := p.marshaler.NameFromMessage(msg)

Expand All @@ -148,7 +148,7 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water
"expected_command_type": cmdName,
"received_command_type": messageCmdName,
})
return nil, nil
return nil
}

logger.Debug("Handling command", watermill.LogFields{
Expand All @@ -157,15 +157,15 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water
})

if err := p.marshaler.Unmarshal(msg, cmd); err != nil {
return nil, err
return err
}

if err := handler.Handle(msg.Context(), cmd); err != nil {
logger.Debug("Error when handling command", watermill.LogFields{"err": err})
return nil, err
return err
}

return nil, nil
return nil
}, nil
}

Expand Down
12 changes: 6 additions & 6 deletions components/cqrs/event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@ func (p EventProcessor) Handlers() []EventHandler {
return p.handlers
}

func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) {
func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) {
initEvent := handler.NewEvent()
expectedEventName := p.marshaler.Name(initEvent)

if err := p.validateEvent(initEvent); err != nil {
return nil, err
}

return func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) error {
event := handler.NewEvent()
messageEventName := p.marshaler.NameFromMessage(msg)

Expand All @@ -133,7 +133,7 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill
"expected_event_type": expectedEventName,
"received_event_type": messageEventName,
})
return nil, nil
return nil
}

logger.Debug("Handling event", watermill.LogFields{
Expand All @@ -142,15 +142,15 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill
})

if err := p.marshaler.Unmarshal(msg, event); err != nil {
return nil, err
return err
}

if err := handler.Handle(msg.Context(), event); err != nil {
logger.Debug("Error when handling event", watermill.LogFields{"err": err})
return nil, err
return err
}

return nil, nil
return nil
}, nil
}

Expand Down
11 changes: 9 additions & 2 deletions message/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ var (
// (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).
type HandlerFunc func(msg *Message) ([]*Message, error)

// NoPublishHandlerFunc is HandlerFunc alternative, which doesn't produce any messages.
type NoPublishHandlerFunc func(msg *Message) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about ReadOnlyHandlerFunc or ConsumeOnlyHandlerFunc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds better @maclav3 ? and does we want to also change AddNoPublisherHandler?


// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
// It can execute something before handler (for example: modify consumed message)
// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
Expand Down Expand Up @@ -228,9 +231,13 @@ func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc HandlerFunc,
handlerFunc NoPublishHandlerFunc,
) {
r.AddHandler(handlerName, subscribeTopic, subscriber, "", disabledPublisher{}, handlerFunc)
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
return nil, handlerFunc(msg)
}

r.AddHandler(handlerName, subscribeTopic, subscriber, "", disabledPublisher{}, handlerFuncAdapter)
}

// Run runs all plugins and handlers and starts subscribing to provided topics.
Expand Down
43 changes: 19 additions & 24 deletions message/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func TestRouter_functional(t *testing.T) {
"test_subscriber_2",
subscribeTopic,
pubSub,
func(msg *message.Message) (producedMessages []*message.Message, err error) {
func(msg *message.Message) (err error) {
receivedMessagesCh2 <- msg
return nil, nil
return nil
},
)

Expand Down Expand Up @@ -133,15 +133,15 @@ func TestRouter_functional_nack(t *testing.T) {
"test_subscriber_1",
"subscribe_topic",
pubSub,
func(msg *message.Message) (producedMessages []*message.Message, err error) {
func(msg *message.Message) (err error) {
messageReceived <- msg

if !internal.IsChannelClosed(nackSend) {
msg.Nack()
close(nackSend)
}

return nil, nil
return nil
},
)

Expand Down Expand Up @@ -182,17 +182,17 @@ func TestRouter_stop_when_all_handlers_stopped(t *testing.T) {
"handler_1",
"foo",
pubSub1,
func(msg *message.Message) (messages []*message.Message, e error) {
return nil, nil
func(msg *message.Message) (e error) {
return nil
},
)

r.AddNoPublisherHandler(
"handler_2",
"foo",
pubSub2,
func(msg *message.Message) (messages []*message.Message, e error) {
return nil, nil
func(msg *message.Message) (e error) {
return nil
},
)

Expand Down Expand Up @@ -298,21 +298,15 @@ func TestRouterNoPublisherHandler(t *testing.T) {
)
require.NoError(t, err)

msgReceived := false
wait := make(chan struct{})

r.AddNoPublisherHandler(
"test_no_publisher_handler",
"subscribe_topic",
pubSub,
func(msg *message.Message) (producedMessages []*message.Message, err error) {
if msgReceived {
require.True(t, msg.Ack())
close(wait)
return nil, nil
}
msgReceived = true
return message.Messages{msg}, nil
func(msg *message.Message) (err error) {
close(wait)
return nil
},
)

Expand All @@ -325,12 +319,13 @@ func TestRouterNoPublisherHandler(t *testing.T) {
err = pubSub.Publish("subscribe_topic", publishedMsg)
require.NoError(t, err)

<-wait
select {
case <-wait:
// ok
case <-time.After(time.Second):
t.Fatal("no message received")
}

// handler has no publisher, so the router should complain about it
// however, it returns no error for now (because of how messages are processed in the router),
// so let's just look for the error in the logger.
assert.True(t, logger.HasError(message.ErrOutputInNoPublisherHandler))
require.NoError(t, r.Close())
}

Expand All @@ -351,9 +346,9 @@ func BenchmarkRouterNoPublisherHandler(b *testing.B) {
"handler",
"benchmark_topic",
sub,
func(msg *message.Message) (messages []*message.Message, e error) {
func(msg *message.Message) (e error) {
allProcessedWg.Done()
return nil, nil
return nil
},
)

Expand Down