diff --git a/message/router_test.go b/message/router_test.go index 4d94c86a7..1bf4a51be 100644 --- a/message/router_test.go +++ b/message/router_test.go @@ -2,6 +2,7 @@ package message_test import ( "fmt" + "sync" "testing" "time" @@ -154,6 +155,125 @@ func TestRouter_functional_nack(t *testing.T) { tests.AssertAllMessagesReceived(t, []*message.Message{publishedMsg, publishedMsg}, messages) } +type benchMockSubscriber struct { + messagesToSend []*message.Message +} + +func (m benchMockSubscriber) Subscribe(topic string) (chan *message.Message, error) { + out := make(chan *message.Message) + + go func() { + for _, msg := range m.messagesToSend { + out <- msg + <-msg.Acked() + } + + close(out) + }() + + return out, nil +} + +func (benchMockSubscriber) Close() error { + return nil +} + +type nopPublisher struct { +} + +func (nopPublisher) Publish(topic string, messages ...*message.Message) error { + return nil +} + +func (nopPublisher) Close() error { + return nil +} + +func BenchmarkRouterHandler(b *testing.B) { + logger := watermill.NopLogger{} + + allProcessedWg := sync.WaitGroup{} + allProcessedWg.Add(b.N) + + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + b.Fatal(err) + } + + sub := createBenchSubscriber(b) + + if err := router.AddHandler( + "handler", + "benchmark_topic", + "publish_topic", + message.NewPubSub(nopPublisher{}, sub), + func(msg *message.Message) (messages []*message.Message, e error) { + allProcessedWg.Done() + return []*message.Message{msg}, nil + }, + ); err != nil { + b.Fatal(err) + } + + go func() { + allProcessedWg.Wait() + router.Close() + }() + + b.ResetTimer() + if err := router.Run(); err != nil { + b.Fatal(err) + } +} + +func BenchmarkRouterNoPublisherHandler(b *testing.B) { + logger := watermill.NopLogger{} + + allProcessedWg := sync.WaitGroup{} + allProcessedWg.Add(b.N) + + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + b.Fatal(err) + } + + sub := createBenchSubscriber(b) + + if err := router.AddNoPublisherHandler( + "handler", + "benchmark_topic", + sub, + func(msg *message.Message) (messages []*message.Message, e error) { + allProcessedWg.Done() + return nil, nil + }, + ); err != nil { + b.Fatal(err) + } + + go func() { + allProcessedWg.Wait() + router.Close() + }() + + b.ResetTimer() + if err := router.Run(); err != nil { + b.Fatal(err) + } +} + +func createBenchSubscriber(b *testing.B) benchMockSubscriber { + var messagesToSend []*message.Message + for i := 0; i < b.N; i++ { + messagesToSend = append( + messagesToSend, + message.NewMessage(uuid.NewV4().String(), []byte(fmt.Sprintf("%d", i))), + ) + } + + return benchMockSubscriber{messagesToSend} +} + func publishMessagesForHandler(t *testing.T, messagesCount int, pubSub message.PubSub, topicName string) []*message.Message { var messagesToPublish []*message.Message