From 057efe777ca24dab38eeb2b599a544798497ede5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Tue, 18 Jun 2024 11:26:31 +0200 Subject: [PATCH] SSE example: more efficient handlers (#453) --- .../server-sent-events-htmx/README.md | 3 + .../server-sent-events-htmx/events.go | 20 +++++- .../server-sent-events-htmx/http.go | 71 ++++++++++++++----- .../server-sent-events-htmx/models.go | 8 +++ 4 files changed, 83 insertions(+), 19 deletions(-) create mode 100644 _examples/real-world-examples/server-sent-events-htmx/README.md diff --git a/_examples/real-world-examples/server-sent-events-htmx/README.md b/_examples/real-world-examples/server-sent-events-htmx/README.md new file mode 100644 index 000000000..a1a73efc3 --- /dev/null +++ b/_examples/real-world-examples/server-sent-events-htmx/README.md @@ -0,0 +1,3 @@ +# Server Sent Events (htmx) + +This is an example project described in [Live website updates with Go, SSE, and htmx](https://threedots.tech/post/live-website-updates-go-sse-htmx/). diff --git a/_examples/real-world-examples/server-sent-events-htmx/events.go b/_examples/real-world-examples/server-sent-events-htmx/events.go index 9ce1d9718..8b475da8e 100644 --- a/_examples/real-world-examples/server-sent-events-htmx/events.go +++ b/_examples/real-world-examples/server-sent-events-htmx/events.go @@ -24,9 +24,11 @@ type PostReactionAdded struct { } type PostStatsUpdated struct { - PostID int `json:"post_id"` - ViewsUpdated bool `json:"views_updated"` - ReactionUpdated *string `json:"reaction_updated"` + PostID int `json:"post_id"` + Views int `json:"views"` + ViewsUpdated bool `json:"views_updated"` + Reactions map[string]int `json:"reactions"` + ReactionUpdated *string `json:"reaction_updated"` } type Routers struct { @@ -98,8 +100,12 @@ func NewRouters(cfg config, repo *Repository) (Routers, error) { cqrs.NewEventHandler( "UpdateViews", func(ctx context.Context, event *PostViewed) error { + var views int + var reactions map[string]int err = repo.UpdatePost(ctx, event.PostID, func(post *Post) { post.Views++ + views = post.Views + reactions = post.Reactions }) if err != nil { return err @@ -108,6 +114,8 @@ func NewRouters(cfg config, repo *Repository) (Routers, error) { statsUpdated := PostStatsUpdated{ PostID: event.PostID, ViewsUpdated: true, + Views: views, + Reactions: reactions, } return eventBus.Publish(ctx, statsUpdated) @@ -116,8 +124,12 @@ func NewRouters(cfg config, repo *Repository) (Routers, error) { cqrs.NewEventHandler( "UpdateReactions", func(ctx context.Context, event *PostReactionAdded) error { + var views int + var reactions map[string]int err := repo.UpdatePost(ctx, event.PostID, func(post *Post) { post.Reactions[event.ReactionID]++ + views = post.Views + reactions = post.Reactions }) if err != nil { return err @@ -125,7 +137,9 @@ func NewRouters(cfg config, repo *Repository) (Routers, error) { statsUpdated := PostStatsUpdated{ PostID: event.PostID, + Views: views, ReactionUpdated: &event.ReactionID, + Reactions: reactions, } return eventBus.Publish(ctx, statsUpdated) diff --git a/_examples/real-world-examples/server-sent-events-htmx/http.go b/_examples/real-world-examples/server-sent-events-htmx/http.go index f7d878075..dea23e7f9 100644 --- a/_examples/real-world-examples/server-sent-events-htmx/http.go +++ b/_examples/real-world-examples/server-sent-events-htmx/http.go @@ -8,6 +8,8 @@ import ( "main/views" "net/http" "strconv" + "sync/atomic" + "time" watermillhttp "github.com/ThreeDotsLabs/watermill-http/v2/pkg/http" "github.com/ThreeDotsLabs/watermill/components/cqrs" @@ -35,6 +37,8 @@ func NewHandler(repo *Repository, eventBus *cqrs.EventBus, sseRouter watermillht e.Use(middleware.Recover()) e.Use(middleware.Logger()) + counter := sseHandlersCounter{} + e.GET("/", h.AllPosts) e.POST("/posts/:id/reactions", h.AddReaction) e.GET("/posts/:id/stats", func(c echo.Context) error { @@ -43,7 +47,14 @@ func NewHandler(repo *Repository, eventBus *cqrs.EventBus, sseRouter watermillht statsHandler(c.Response(), c.Request()) return nil - }) + }, counter.Middleware) + + go func() { + for { + fmt.Println("SSE handlers count:", counter.Count.Load()) + time.Sleep(60 * time.Second) + } + }() return e } @@ -121,7 +132,20 @@ func (s *statsStream) InitialStreamResponse(w http.ResponseWriter, r *http.Reque return nil, false } - resp, err := s.getResponse(r.Context(), postID, nil) + post, err := s.repo.PostByID(r.Context(), postID) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("could not get post")) + return nil, false + } + + stats := PostStats{ + ID: post.ID, + Views: post.Views, + Reactions: post.Reactions, + } + + resp, err := newPostStatsView(r.Context(), stats) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) @@ -150,7 +174,15 @@ func (s *statsStream) NextStreamResponse(r *http.Request, msg *message.Message) return "", false } - resp, err := s.getResponse(r.Context(), postID, &event) + stats := PostStats{ + ID: event.PostID, + Views: event.Views, + ViewsUpdated: event.ViewsUpdated, + Reactions: event.Reactions, + ReactionUpdated: event.ReactionUpdated, + } + + resp, err := newPostStatsView(r.Context(), stats) if err != nil { fmt.Println("could not get response: " + err.Error()) return nil, false @@ -159,34 +191,29 @@ func (s *statsStream) NextStreamResponse(r *http.Request, msg *message.Message) return resp, true } -func (s *statsStream) getResponse(ctx context.Context, postID int, event *PostStatsUpdated) (interface{}, error) { - post, err := s.repo.PostByID(ctx, postID) - if err != nil { - return nil, err - } - +func newPostStatsView(ctx context.Context, stats PostStats) (interface{}, error) { var reactions []views.Reaction for _, r := range allReactions { reactions = append(reactions, views.Reaction{ ID: r.ID, Label: r.Label, - Count: fmt.Sprint(post.Reactions[r.ID]), - JustChanged: event != nil && event.ReactionUpdated != nil && *event.ReactionUpdated == r.ID, + Count: fmt.Sprint(stats.Reactions[r.ID]), + JustChanged: stats.ReactionUpdated != nil && *stats.ReactionUpdated == r.ID, }) } - stats := views.PostStats{ - PostID: fmt.Sprint(post.ID), + view := views.PostStats{ + PostID: fmt.Sprint(stats.ID), Views: views.PostViews{ - Count: fmt.Sprint(post.Views), - JustChanged: event != nil && event.ViewsUpdated, + Count: fmt.Sprint(stats.Views), + JustChanged: stats.ViewsUpdated, }, Reactions: reactions, } var buffer bytes.Buffer - err = views.PostStatsView(stats).Render(ctx, &buffer) + err := views.PostStatsView(view).Render(ctx, &buffer) if err != nil { return nil, err } @@ -202,3 +229,15 @@ func newPostView(p Post) views.Post { Date: p.CreatedAt.Format("02 Jan 2006 15:04"), } } + +type sseHandlersCounter struct { + Count atomic.Int64 +} + +func (s *sseHandlersCounter) Middleware(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + s.Count.Add(1) + defer s.Count.Add(-1) + return next(c) + } +} diff --git a/_examples/real-world-examples/server-sent-events-htmx/models.go b/_examples/real-world-examples/server-sent-events-htmx/models.go index cb8ce69ab..cf5ec6e0b 100644 --- a/_examples/real-world-examples/server-sent-events-htmx/models.go +++ b/_examples/real-world-examples/server-sent-events-htmx/models.go @@ -48,3 +48,11 @@ type Post struct { Views int Reactions map[string]int } + +type PostStats struct { + ID int + Views int + ViewsUpdated bool + Reactions map[string]int + ReactionUpdated *string +}