From 2027e898b3410f94baa5dd03176db4b7033939e8 Mon Sep 17 00:00:00 2001 From: Jason Kulatunga Date: Thu, 7 Sep 2023 20:23:51 -0700 Subject: [PATCH] using Server-sent-events (SSE) for notifications support (and background processing) WIP. --- backend/pkg/constants.go | 3 + backend/pkg/web/handler/server_sent_event.go | 36 ++++++ .../pkg/web/middleware/server_sent_event.go | 120 ++++++++++++++++++ backend/pkg/web/server.go | 2 + 4 files changed, 161 insertions(+) create mode 100644 backend/pkg/web/handler/server_sent_event.go create mode 100644 backend/pkg/web/middleware/server_sent_event.go diff --git a/backend/pkg/constants.go b/backend/pkg/constants.go index b16fb2d9a..b02e68602 100644 --- a/backend/pkg/constants.go +++ b/backend/pkg/constants.go @@ -9,6 +9,9 @@ const ( ContextKeyTypeDatabase string = "REPOSITORY" ContextKeyTypeLogger string = "LOGGER" + ContextKeyTypeSSEServer string = "SSE_SERVER" + ContextKeyTypeSSEClientChannel string = "SSE_CLIENT_CHANNEL" + ContextKeyTypeAuthUsername string = "AUTH_USERNAME" ContextKeyTypeAuthToken string = "AUTH_TOKEN" diff --git a/backend/pkg/web/handler/server_sent_event.go b/backend/pkg/web/handler/server_sent_event.go new file mode 100644 index 000000000..e3289cc41 --- /dev/null +++ b/backend/pkg/web/handler/server_sent_event.go @@ -0,0 +1,36 @@ +package handler + +import ( + "github.com/fastenhealth/fasten-onprem/backend/pkg" + "github.com/fastenhealth/fasten-onprem/backend/pkg/web/middleware" + "github.com/gin-gonic/gin" + "io" +) + +// SSEStream is a handler for the server sent event stream (notifications from background processes) +// see: https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go +// see: https://stackoverflow.com/questions/66327142/selectively-send-event-to-particular-clients +// +// test using: +// curl -N -H "Authorization: Bearer xxxxx" http://localhost:9090/api/secure/sse/stream +func SSEStream(c *gin.Context) { + + //logger := c.MustGet(pkg.ContextKeyTypeLogger).(*logrus.Entry) + //databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository) + v, ok := c.Get(pkg.ContextKeyTypeSSEClientChannel) + if !ok { + return + } + clientChan, ok := v.(middleware.ClientChan) + if !ok { + return + } + c.Stream(func(w io.Writer) bool { + // Stream message to client from message channel + if msg, ok := <-clientChan; ok { + c.SSEvent("message", msg) + return true + } + return false + }) +} diff --git a/backend/pkg/web/middleware/server_sent_event.go b/backend/pkg/web/middleware/server_sent_event.go new file mode 100644 index 000000000..9f0f545ad --- /dev/null +++ b/backend/pkg/web/middleware/server_sent_event.go @@ -0,0 +1,120 @@ +package middleware + +import ( + "fmt" + "github.com/fastenhealth/fasten-onprem/backend/pkg" + "github.com/gin-gonic/gin" + "log" + "time" +) + +func SSEHeaderMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("Transfer-Encoding", "chunked") + c.Next() + } +} + +func SSEServerMiddleware() gin.HandlerFunc { + + // Initialize new streaming server + stream := NewSSEServer() + + ///TODO: testing only + go func() { + for { + time.Sleep(time.Second * 10) + now := time.Now().Format("2006-01-02 15:04:05") + currentTime := fmt.Sprintf("The Current Time Is %v", now) + + // Send current time to clients message channel + stream.Message <- currentTime + } + }() + + return func(c *gin.Context) { + // Initialize client channel + clientChan := make(ClientChan) + + // Send new connection to event server + stream.NewClients <- clientChan + + defer func() { + // Send closed connection to event server + stream.ClosedClients <- clientChan + }() + + c.Set(pkg.ContextKeyTypeSSEServer, stream) + c.Set(pkg.ContextKeyTypeSSEClientChannel, clientChan) + + c.Next() + } +} + +//#################################################################################################### +//TODO: this should all be moved into its own package +//#################################################################################################### + +// New event messages are broadcast to all registered client connection channels +// TODO: change this to be use specific channels. +type ClientChan chan string + +// Initialize event and Start procnteessing requests +// this should be a singleton, to ensure that we're always broadcasting to the same clients +// see: https://refactoring.guru/design-patterns/singleton/go/example +func NewSSEServer() (event *SSEvent) { + event = &SSEvent{ + Message: make(chan string), + NewClients: make(chan chan string), + ClosedClients: make(chan chan string), + TotalClients: make(map[chan string]bool), + } + + go event.listen() + + return +} + +// It keeps a list of clients those are currently attached +// and broadcasting events to those clients. +type SSEvent struct { + // Events are pushed to this channel by the main events-gathering routine + Message chan string + + // New client connections + NewClients chan chan string + + // Closed client connections + ClosedClients chan chan string + + // Total client connections + TotalClients map[chan string]bool +} + +// It Listens all incoming requests from clients. +// Handles addition and removal of clients and broadcast messages to clients. +func (stream *SSEvent) listen() { + for { + select { + // Add new available client + case client := <-stream.NewClients: + stream.TotalClients[client] = true + log.Printf("Client added. %d registered clients", len(stream.TotalClients)) + + // Remove closed client + case client := <-stream.ClosedClients: + delete(stream.TotalClients, client) + close(client) + log.Printf("Removed client. %d registered clients", len(stream.TotalClients)) + + // Broadcast message to client + case eventMsg := <-stream.Message: + for clientMessageChan := range stream.TotalClients { + clientMessageChan <- eventMsg + } + } + } +} diff --git a/backend/pkg/web/server.go b/backend/pkg/web/server.go index 45528bc25..c5834e7f6 100644 --- a/backend/pkg/web/server.go +++ b/backend/pkg/web/server.go @@ -70,6 +70,8 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) { secure.POST("/query", handler.QueryResourceFhir) + //server-side-events handler + secure.GET("/sse/stream", middleware.SSEHeaderMiddleware(), middleware.SSEServerMiddleware(), handler.SSEStream) } if ae.Config.GetBool("web.allow_unsafe_endpoints") {