Skip to content

Commit

Permalink
SSE example: more efficient handlers (ThreeDotsLabs#453)
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 authored and thejoeejoee committed Sep 5, 2024
1 parent 70ef1f9 commit 057efe7
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Server Sent Events (htmx)

This is an example project described in [Live website updates with Go, SSE, and htmx](https://threedots.tech/post/live-website-updates-go-sse-htmx/).
20 changes: 17 additions & 3 deletions _examples/real-world-examples/server-sent-events-htmx/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ type PostReactionAdded struct {
}

type PostStatsUpdated struct {
PostID int `json:"post_id"`
ViewsUpdated bool `json:"views_updated"`
ReactionUpdated *string `json:"reaction_updated"`
PostID int `json:"post_id"`
Views int `json:"views"`
ViewsUpdated bool `json:"views_updated"`
Reactions map[string]int `json:"reactions"`
ReactionUpdated *string `json:"reaction_updated"`
}

type Routers struct {
Expand Down Expand Up @@ -98,8 +100,12 @@ func NewRouters(cfg config, repo *Repository) (Routers, error) {
cqrs.NewEventHandler(
"UpdateViews",
func(ctx context.Context, event *PostViewed) error {
var views int
var reactions map[string]int
err = repo.UpdatePost(ctx, event.PostID, func(post *Post) {
post.Views++
views = post.Views
reactions = post.Reactions
})
if err != nil {
return err
Expand All @@ -108,6 +114,8 @@ func NewRouters(cfg config, repo *Repository) (Routers, error) {
statsUpdated := PostStatsUpdated{
PostID: event.PostID,
ViewsUpdated: true,
Views: views,
Reactions: reactions,
}

return eventBus.Publish(ctx, statsUpdated)
Expand All @@ -116,16 +124,22 @@ func NewRouters(cfg config, repo *Repository) (Routers, error) {
cqrs.NewEventHandler(
"UpdateReactions",
func(ctx context.Context, event *PostReactionAdded) error {
var views int
var reactions map[string]int
err := repo.UpdatePost(ctx, event.PostID, func(post *Post) {
post.Reactions[event.ReactionID]++
views = post.Views
reactions = post.Reactions
})
if err != nil {
return err
}

statsUpdated := PostStatsUpdated{
PostID: event.PostID,
Views: views,
ReactionUpdated: &event.ReactionID,
Reactions: reactions,
}

return eventBus.Publish(ctx, statsUpdated)
Expand Down
71 changes: 55 additions & 16 deletions _examples/real-world-examples/server-sent-events-htmx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"main/views"
"net/http"
"strconv"
"sync/atomic"
"time"

watermillhttp "github.com/ThreeDotsLabs/watermill-http/v2/pkg/http"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
Expand Down Expand Up @@ -35,6 +37,8 @@ func NewHandler(repo *Repository, eventBus *cqrs.EventBus, sseRouter watermillht
e.Use(middleware.Recover())
e.Use(middleware.Logger())

counter := sseHandlersCounter{}

e.GET("/", h.AllPosts)
e.POST("/posts/:id/reactions", h.AddReaction)
e.GET("/posts/:id/stats", func(c echo.Context) error {
Expand All @@ -43,7 +47,14 @@ func NewHandler(repo *Repository, eventBus *cqrs.EventBus, sseRouter watermillht

statsHandler(c.Response(), c.Request())
return nil
})
}, counter.Middleware)

go func() {
for {
fmt.Println("SSE handlers count:", counter.Count.Load())
time.Sleep(60 * time.Second)
}
}()

return e
}
Expand Down Expand Up @@ -121,7 +132,20 @@ func (s *statsStream) InitialStreamResponse(w http.ResponseWriter, r *http.Reque
return nil, false
}

resp, err := s.getResponse(r.Context(), postID, nil)
post, err := s.repo.PostByID(r.Context(), postID)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("could not get post"))
return nil, false
}

stats := PostStats{
ID: post.ID,
Views: post.Views,
Reactions: post.Reactions,
}

resp, err := newPostStatsView(r.Context(), stats)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
Expand Down Expand Up @@ -150,7 +174,15 @@ func (s *statsStream) NextStreamResponse(r *http.Request, msg *message.Message)
return "", false
}

resp, err := s.getResponse(r.Context(), postID, &event)
stats := PostStats{
ID: event.PostID,
Views: event.Views,
ViewsUpdated: event.ViewsUpdated,
Reactions: event.Reactions,
ReactionUpdated: event.ReactionUpdated,
}

resp, err := newPostStatsView(r.Context(), stats)
if err != nil {
fmt.Println("could not get response: " + err.Error())
return nil, false
Expand All @@ -159,34 +191,29 @@ func (s *statsStream) NextStreamResponse(r *http.Request, msg *message.Message)
return resp, true
}

func (s *statsStream) getResponse(ctx context.Context, postID int, event *PostStatsUpdated) (interface{}, error) {
post, err := s.repo.PostByID(ctx, postID)
if err != nil {
return nil, err
}

func newPostStatsView(ctx context.Context, stats PostStats) (interface{}, error) {
var reactions []views.Reaction

for _, r := range allReactions {
reactions = append(reactions, views.Reaction{
ID: r.ID,
Label: r.Label,
Count: fmt.Sprint(post.Reactions[r.ID]),
JustChanged: event != nil && event.ReactionUpdated != nil && *event.ReactionUpdated == r.ID,
Count: fmt.Sprint(stats.Reactions[r.ID]),
JustChanged: stats.ReactionUpdated != nil && *stats.ReactionUpdated == r.ID,
})
}

stats := views.PostStats{
PostID: fmt.Sprint(post.ID),
view := views.PostStats{
PostID: fmt.Sprint(stats.ID),
Views: views.PostViews{
Count: fmt.Sprint(post.Views),
JustChanged: event != nil && event.ViewsUpdated,
Count: fmt.Sprint(stats.Views),
JustChanged: stats.ViewsUpdated,
},
Reactions: reactions,
}

var buffer bytes.Buffer
err = views.PostStatsView(stats).Render(ctx, &buffer)
err := views.PostStatsView(view).Render(ctx, &buffer)
if err != nil {
return nil, err
}
Expand All @@ -202,3 +229,15 @@ func newPostView(p Post) views.Post {
Date: p.CreatedAt.Format("02 Jan 2006 15:04"),
}
}

type sseHandlersCounter struct {
Count atomic.Int64
}

func (s *sseHandlersCounter) Middleware(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
s.Count.Add(1)
defer s.Count.Add(-1)
return next(c)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ type Post struct {
Views int
Reactions map[string]int
}

type PostStats struct {
ID int
Views int
ViewsUpdated bool
Reactions map[string]int
ReactionUpdated *string
}

0 comments on commit 057efe7

Please # to comment.