generated from PackagrIO/goweb-template
-
-
Notifications
You must be signed in to change notification settings - Fork 84
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
using Server-sent-events (SSE) for notifications support (and backgro…
…und processing) WIP.
- Loading branch information
Showing
4 changed files
with
161 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package handler | ||
|
||
import ( | ||
"github.com/fastenhealth/fasten-onprem/backend/pkg" | ||
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/middleware" | ||
"github.com/gin-gonic/gin" | ||
"io" | ||
) | ||
|
||
// SSEStream is a handler for the server sent event stream (notifications from background processes) | ||
// see: https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go | ||
// see: https://stackoverflow.com/questions/66327142/selectively-send-event-to-particular-clients | ||
// | ||
// test using: | ||
// curl -N -H "Authorization: Bearer xxxxx" http://localhost:9090/api/secure/sse/stream | ||
func SSEStream(c *gin.Context) { | ||
|
||
//logger := c.MustGet(pkg.ContextKeyTypeLogger).(*logrus.Entry) | ||
//databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository) | ||
v, ok := c.Get(pkg.ContextKeyTypeSSEClientChannel) | ||
if !ok { | ||
return | ||
} | ||
clientChan, ok := v.(middleware.ClientChan) | ||
if !ok { | ||
return | ||
} | ||
c.Stream(func(w io.Writer) bool { | ||
// Stream message to client from message channel | ||
if msg, ok := <-clientChan; ok { | ||
c.SSEvent("message", msg) | ||
return true | ||
} | ||
return false | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
package middleware | ||
|
||
import ( | ||
"fmt" | ||
"github.com/fastenhealth/fasten-onprem/backend/pkg" | ||
"github.com/gin-gonic/gin" | ||
"log" | ||
"time" | ||
) | ||
|
||
func SSEHeaderMiddleware() gin.HandlerFunc { | ||
return func(c *gin.Context) { | ||
c.Writer.Header().Set("Content-Type", "text/event-stream") | ||
c.Writer.Header().Set("Cache-Control", "no-cache") | ||
c.Writer.Header().Set("Connection", "keep-alive") | ||
c.Writer.Header().Set("Transfer-Encoding", "chunked") | ||
c.Next() | ||
} | ||
} | ||
|
||
func SSEServerMiddleware() gin.HandlerFunc { | ||
|
||
// Initialize new streaming server | ||
stream := NewSSEServer() | ||
|
||
///TODO: testing only | ||
go func() { | ||
for { | ||
time.Sleep(time.Second * 10) | ||
now := time.Now().Format("2006-01-02 15:04:05") | ||
currentTime := fmt.Sprintf("The Current Time Is %v", now) | ||
|
||
// Send current time to clients message channel | ||
stream.Message <- currentTime | ||
} | ||
}() | ||
|
||
return func(c *gin.Context) { | ||
// Initialize client channel | ||
clientChan := make(ClientChan) | ||
|
||
// Send new connection to event server | ||
stream.NewClients <- clientChan | ||
|
||
defer func() { | ||
// Send closed connection to event server | ||
stream.ClosedClients <- clientChan | ||
}() | ||
|
||
c.Set(pkg.ContextKeyTypeSSEServer, stream) | ||
c.Set(pkg.ContextKeyTypeSSEClientChannel, clientChan) | ||
|
||
c.Next() | ||
} | ||
} | ||
|
||
//#################################################################################################### | ||
//TODO: this should all be moved into its own package | ||
//#################################################################################################### | ||
|
||
// New event messages are broadcast to all registered client connection channels | ||
// TODO: change this to be use specific channels. | ||
type ClientChan chan string | ||
|
||
// Initialize event and Start procnteessing requests | ||
// this should be a singleton, to ensure that we're always broadcasting to the same clients | ||
// see: https://refactoring.guru/design-patterns/singleton/go/example | ||
func NewSSEServer() (event *SSEvent) { | ||
event = &SSEvent{ | ||
Message: make(chan string), | ||
NewClients: make(chan chan string), | ||
ClosedClients: make(chan chan string), | ||
TotalClients: make(map[chan string]bool), | ||
} | ||
|
||
go event.listen() | ||
|
||
return | ||
} | ||
|
||
// It keeps a list of clients those are currently attached | ||
// and broadcasting events to those clients. | ||
type SSEvent struct { | ||
// Events are pushed to this channel by the main events-gathering routine | ||
Message chan string | ||
|
||
// New client connections | ||
NewClients chan chan string | ||
|
||
// Closed client connections | ||
ClosedClients chan chan string | ||
|
||
// Total client connections | ||
TotalClients map[chan string]bool | ||
} | ||
|
||
// It Listens all incoming requests from clients. | ||
// Handles addition and removal of clients and broadcast messages to clients. | ||
func (stream *SSEvent) listen() { | ||
for { | ||
select { | ||
// Add new available client | ||
case client := <-stream.NewClients: | ||
stream.TotalClients[client] = true | ||
log.Printf("Client added. %d registered clients", len(stream.TotalClients)) | ||
|
||
// Remove closed client | ||
case client := <-stream.ClosedClients: | ||
delete(stream.TotalClients, client) | ||
close(client) | ||
log.Printf("Removed client. %d registered clients", len(stream.TotalClients)) | ||
|
||
// Broadcast message to client | ||
case eventMsg := <-stream.Message: | ||
for clientMessageChan := range stream.TotalClients { | ||
clientMessageChan <- eventMsg | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters