Skip to content

Commit

Permalink
added router benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak committed Dec 20, 2018
1 parent 71959fc commit c99b3f5
Showing 1 changed file with 120 additions and 0 deletions.
120 changes: 120 additions & 0 deletions message/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package message_test

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -154,6 +155,125 @@ func TestRouter_functional_nack(t *testing.T) {
tests.AssertAllMessagesReceived(t, []*message.Message{publishedMsg, publishedMsg}, messages)
}

type benchMockSubscriber struct {
messagesToSend []*message.Message
}

func (m benchMockSubscriber) Subscribe(topic string) (chan *message.Message, error) {
out := make(chan *message.Message)

go func() {
for _, msg := range m.messagesToSend {
out <- msg
<-msg.Acked()
}

close(out)
}()

return out, nil
}

func (benchMockSubscriber) Close() error {
return nil
}

type nopPublisher struct {
}

func (nopPublisher) Publish(topic string, messages ...*message.Message) error {
return nil
}

func (nopPublisher) Close() error {
return nil
}

func BenchmarkRouterHandler(b *testing.B) {
logger := watermill.NopLogger{}

allProcessedWg := sync.WaitGroup{}
allProcessedWg.Add(b.N)

router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
b.Fatal(err)
}

sub := createBenchSubscriber(b)

if err := router.AddHandler(
"handler",
"benchmark_topic",
"publish_topic",
message.NewPubSub(nopPublisher{}, sub),
func(msg *message.Message) (messages []*message.Message, e error) {
allProcessedWg.Done()
return []*message.Message{msg}, nil
},
); err != nil {
b.Fatal(err)
}

go func() {
allProcessedWg.Wait()
router.Close()
}()

b.ResetTimer()
if err := router.Run(); err != nil {
b.Fatal(err)
}
}

func BenchmarkRouterNoPublisherHandler(b *testing.B) {
logger := watermill.NopLogger{}

allProcessedWg := sync.WaitGroup{}
allProcessedWg.Add(b.N)

router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
b.Fatal(err)
}

sub := createBenchSubscriber(b)

if err := router.AddNoPublisherHandler(
"handler",
"benchmark_topic",
sub,
func(msg *message.Message) (messages []*message.Message, e error) {
allProcessedWg.Done()
return nil, nil
},
); err != nil {
b.Fatal(err)
}

go func() {
allProcessedWg.Wait()
router.Close()
}()

b.ResetTimer()
if err := router.Run(); err != nil {
b.Fatal(err)
}
}

func createBenchSubscriber(b *testing.B) benchMockSubscriber {
var messagesToSend []*message.Message
for i := 0; i < b.N; i++ {
messagesToSend = append(
messagesToSend,
message.NewMessage(uuid.NewV4().String(), []byte(fmt.Sprintf("%d", i))),
)
}

return benchMockSubscriber{messagesToSend}
}

func publishMessagesForHandler(t *testing.T, messagesCount int, pubSub message.PubSub, topicName string) []*message.Message {
var messagesToPublish []*message.Message

Expand Down

0 comments on commit c99b3f5

Please # to comment.