diff --git a/_examples/simple-app/app2/counter.go b/_examples/simple-app/app2/counter.go index 6eb299805..e6f2e3130 100644 --- a/_examples/simple-app/app2/counter.go +++ b/_examples/simple-app/app2/counter.go @@ -35,6 +35,8 @@ type PostsCounter struct { } func (p PostsCounter) Count(msg *message.Message) ([]*message.Message, error) { + // in production use when implementing counter we probably want to make some kind of deduplication here + newCount, err := p.countStorage.CountAdd() if err != nil { return nil, errors.Wrap(err, "cannot add count") diff --git a/_examples/simple-app/app2/feed_updater.go b/_examples/simple-app/app2/feed_updater.go index 540396cd9..91ff5dd95 100644 --- a/_examples/simple-app/app2/feed_updater.go +++ b/_examples/simple-app/app2/feed_updater.go @@ -9,6 +9,13 @@ import ( "github.com/pkg/errors" ) +// intentionally not importing type from app1, because we don't need all data and we want to avoid coupling +type postAdded struct { + OccurredOn time.Time `json:"occurred_on"` + Author string `json:"author"` + Title string `json:"title"` +} + type feedStorage interface { AddToFeed(title, author string, time time.Time) error } diff --git a/_examples/simple-app/app2/main.go b/_examples/simple-app/app2/main.go index 54e7733f7..5d496b4c4 100644 --- a/_examples/simple-app/app2/main.go +++ b/_examples/simple-app/app2/main.go @@ -12,12 +12,6 @@ import ( "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka" ) -type postAdded struct { - OccurredOn time.Time `json:"occurred_on"` - Author string `json:"author"` - Title string `json:"title"` -} - var ( marshaler = kafka.DefaultMarshaler{} brokers = []string{"localhost:9092"} @@ -51,16 +45,31 @@ func main() { } h.AddMiddleware( + // some, simple metrics newMetricsMiddleware().Middleware, + + // retry middleware retries message processing if error occurred in handler poisonQueue.Middleware, + + // if retries limit was exceeded, message is sent to poison queue (poison_queue topic) retryMiddleware.Middleware, + + // recovered recovers panic from handlers middleware.Recoverer, + + // correlation ID middleware adds to every produced message correlation id of consumed message, + // useful for debugging middleware.CorrelationID, + + // simulating error or panic from handler middleware.RandomFail(0.1), middleware.RandomPanic(0.1), ) + + // close router when SIGTERM is sent h.AddPlugin(plugin.SignalsHandler) + // handler which just counts added posts h.AddHandler( "posts_counter", "app1-posts_published", @@ -68,6 +77,11 @@ func main() { message.NewPubSub(pub, createSubscriber("app2-posts_counter_v2", logger)), PostsCounter{memoryCountStorage{new(int64)}}.Count, ) + + // handler which generates "feed" from events post + // + // this implementation just prints it to stdout, + // but production ready implementation would save posts to some persistent storage h.AddNoPublisherHandler( "feed_generator", "app1-posts_published",