Skip to content

Commit

Permalink
changed AddNoPublisherHandler signature (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak authored Jun 1, 2019
1 parent 76edb0c commit b00ac54
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 38 deletions.
12 changes: 6 additions & 6 deletions components/cqrs/command_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ func (p CommandProcessor) Handlers() []CommandHandler {
return p.handlers
}

func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) {
func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) {
cmd := handler.NewCommand()
cmdName := p.marshaler.Name(cmd)

if err := p.validateCommand(cmd); err != nil {
return nil, err
}

return func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) error {
cmd := handler.NewCommand()
messageCmdName := p.marshaler.NameFromMessage(msg)

Expand All @@ -148,7 +148,7 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water
"expected_command_type": cmdName,
"received_command_type": messageCmdName,
})
return nil, nil
return nil
}

logger.Debug("Handling command", watermill.LogFields{
Expand All @@ -157,15 +157,15 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water
})

if err := p.marshaler.Unmarshal(msg, cmd); err != nil {
return nil, err
return err
}

if err := handler.Handle(msg.Context(), cmd); err != nil {
logger.Debug("Error when handling command", watermill.LogFields{"err": err})
return nil, err
return err
}

return nil, nil
return nil
}, nil
}

Expand Down
12 changes: 6 additions & 6 deletions components/cqrs/event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@ func (p EventProcessor) Handlers() []EventHandler {
return p.handlers
}

func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) {
func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill.LoggerAdapter) (message.NoPublishHandlerFunc, error) {
initEvent := handler.NewEvent()
expectedEventName := p.marshaler.Name(initEvent)

if err := p.validateEvent(initEvent); err != nil {
return nil, err
}

return func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) error {
event := handler.NewEvent()
messageEventName := p.marshaler.NameFromMessage(msg)

Expand All @@ -133,7 +133,7 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill
"expected_event_type": expectedEventName,
"received_event_type": messageEventName,
})
return nil, nil
return nil
}

logger.Debug("Handling event", watermill.LogFields{
Expand All @@ -142,15 +142,15 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill
})

if err := p.marshaler.Unmarshal(msg, event); err != nil {
return nil, err
return err
}

if err := handler.Handle(msg.Context(), event); err != nil {
logger.Debug("Error when handling event", watermill.LogFields{"err": err})
return nil, err
return err
}

return nil, nil
return nil
}, nil
}

Expand Down
11 changes: 9 additions & 2 deletions message/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ var (
// (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).
type HandlerFunc func(msg *Message) ([]*Message, error)

// NoPublishHandlerFunc is HandlerFunc alternative, which doesn't produce any messages.
type NoPublishHandlerFunc func(msg *Message) error

// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
// It can execute something before handler (for example: modify consumed message)
// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
Expand Down Expand Up @@ -228,9 +231,13 @@ func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc HandlerFunc,
handlerFunc NoPublishHandlerFunc,
) {
r.AddHandler(handlerName, subscribeTopic, subscriber, "", disabledPublisher{}, handlerFunc)
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
return nil, handlerFunc(msg)
}

r.AddHandler(handlerName, subscribeTopic, subscriber, "", disabledPublisher{}, handlerFuncAdapter)
}

// Run runs all plugins and handlers and starts subscribing to provided topics.
Expand Down
43 changes: 19 additions & 24 deletions message/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func TestRouter_functional(t *testing.T) {
"test_subscriber_2",
subscribeTopic,
pubSub,
func(msg *message.Message) (producedMessages []*message.Message, err error) {
func(msg *message.Message) (err error) {
receivedMessagesCh2 <- msg
return nil, nil
return nil
},
)

Expand Down Expand Up @@ -133,15 +133,15 @@ func TestRouter_functional_nack(t *testing.T) {
"test_subscriber_1",
"subscribe_topic",
pubSub,
func(msg *message.Message) (producedMessages []*message.Message, err error) {
func(msg *message.Message) (err error) {
messageReceived <- msg

if !internal.IsChannelClosed(nackSend) {
msg.Nack()
close(nackSend)
}

return nil, nil
return nil
},
)

Expand Down Expand Up @@ -182,17 +182,17 @@ func TestRouter_stop_when_all_handlers_stopped(t *testing.T) {
"handler_1",
"foo",
pubSub1,
func(msg *message.Message) (messages []*message.Message, e error) {
return nil, nil
func(msg *message.Message) (e error) {
return nil
},
)

r.AddNoPublisherHandler(
"handler_2",
"foo",
pubSub2,
func(msg *message.Message) (messages []*message.Message, e error) {
return nil, nil
func(msg *message.Message) (e error) {
return nil
},
)

Expand Down Expand Up @@ -298,21 +298,15 @@ func TestRouterNoPublisherHandler(t *testing.T) {
)
require.NoError(t, err)

msgReceived := false
wait := make(chan struct{})

r.AddNoPublisherHandler(
"test_no_publisher_handler",
"subscribe_topic",
pubSub,
func(msg *message.Message) (producedMessages []*message.Message, err error) {
if msgReceived {
require.True(t, msg.Ack())
close(wait)
return nil, nil
}
msgReceived = true
return message.Messages{msg}, nil
func(msg *message.Message) (err error) {
close(wait)
return nil
},
)

Expand All @@ -325,12 +319,13 @@ func TestRouterNoPublisherHandler(t *testing.T) {
err = pubSub.Publish("subscribe_topic", publishedMsg)
require.NoError(t, err)

<-wait
select {
case <-wait:
// ok
case <-time.After(time.Second):
t.Fatal("no message received")
}

// handler has no publisher, so the router should complain about it
// however, it returns no error for now (because of how messages are processed in the router),
// so let's just look for the error in the logger.
assert.True(t, logger.HasError(message.ErrOutputInNoPublisherHandler))
require.NoError(t, r.Close())
}

Expand All @@ -351,9 +346,9 @@ func BenchmarkRouterNoPublisherHandler(b *testing.B) {
"handler",
"benchmark_topic",
sub,
func(msg *message.Message) (messages []*message.Message, e error) {
func(msg *message.Message) (e error) {
allProcessedWg.Done()
return nil, nil
return nil
},
)

Expand Down

0 comments on commit b00ac54

Please # to comment.