diff --git a/backend/pkg/event_bus/event_bus.go b/backend/pkg/event_bus/event_bus.go index 085bfcdf0..cf4f6d1ed 100644 --- a/backend/pkg/event_bus/event_bus.go +++ b/backend/pkg/event_bus/event_bus.go @@ -29,13 +29,23 @@ func GetEventBusServer(logger logrus.FieldLogger) *EventBus { singletonEventBusInstance = &EventBus{ Logger: logger, Message: make(chan EventBusMessage), - NewListener: make(chan EventBusListener), - ClosedListener: make(chan EventBusListener), - TotalRoomListeners: make(map[string][]EventBusListener), + NewListener: make(chan *EventBusListener), + ClosedListener: make(chan *EventBusListener), + TotalRoomListeners: make(map[string][]*EventBusListener), } // Start processing requests go singletonEventBusInstance.listen() + + //background keep-alive for testing + //go func() { + // for { + // time.Sleep(time.Second * 10) + // // Send current time + // singletonEventBusInstance.PublishMessage(models.NewEventKeepAlive("keep-alive")) + // } + //}() + } else { fmt.Println("Single instance already created.") } @@ -55,13 +65,13 @@ type EventBus struct { Message chan EventBusMessage // New client connections - NewListener chan EventBusListener + NewListener chan *EventBusListener // Closed client connections - ClosedListener chan EventBusListener + ClosedListener chan *EventBusListener // Total client connections - TotalRoomListeners map[string][]EventBusListener + TotalRoomListeners map[string][]*EventBusListener } type EventBusListener struct { @@ -84,7 +94,7 @@ func (bus *EventBus) listen() { case listener := <-bus.NewListener: //check if this userId room already exists, or create it if _, exists := bus.TotalRoomListeners[listener.UserID]; !exists { - bus.TotalRoomListeners[listener.UserID] = []EventBusListener{} + bus.TotalRoomListeners[listener.UserID] = []*EventBusListener{} } bus.TotalRoomListeners[listener.UserID] = append(bus.TotalRoomListeners[listener.UserID], listener) log.Printf("Listener added to room: `%s`. %d registered listeners", listener.UserID, len(bus.TotalRoomListeners[listener.UserID])) diff --git a/backend/pkg/models/event.go b/backend/pkg/models/event.go index 7c5a8d890..944665d96 100644 --- a/backend/pkg/models/event.go +++ b/backend/pkg/models/event.go @@ -3,6 +3,7 @@ package models type EventSourceSyncStatus string const ( + EventTypeKeepAlive EventSourceSyncStatus = "keep_alive" EventTypeSourceSync EventSourceSyncStatus = "source_sync" EventTypeSourceComplete EventSourceSyncStatus = "source_complete" ) diff --git a/backend/pkg/models/event_keep_alive.go b/backend/pkg/models/event_keep_alive.go new file mode 100644 index 000000000..a78ba650b --- /dev/null +++ b/backend/pkg/models/event_keep_alive.go @@ -0,0 +1,18 @@ +package models + +import "time" + +type EventKeepAlive struct { + *Event `json:",inline"` + Time string `json:"time"` +} + +func NewEventKeepAlive(userID string) *EventKeepAlive { + return &EventKeepAlive{ + Event: &Event{ + UserID: userID, + EventType: EventTypeKeepAlive, + }, + Time: time.Now().Format("2006-01-02 15:04:05"), + } +} diff --git a/backend/pkg/web/handler/server_sent_event.go b/backend/pkg/web/handler/server_sent_event.go index 634fd6659..33924f124 100644 --- a/backend/pkg/web/handler/server_sent_event.go +++ b/backend/pkg/web/handler/server_sent_event.go @@ -23,7 +23,7 @@ func SSEStream(c *gin.Context) { log.Printf("could not get client channel from context") return } - listener, ok := v.(event_bus.EventBusListener) + listener, ok := v.(*event_bus.EventBusListener) if !ok { return } diff --git a/backend/pkg/web/middleware/server_sent_event.go b/backend/pkg/web/middleware/server_sent_event.go index 52174b4cf..3c876c250 100644 --- a/backend/pkg/web/middleware/server_sent_event.go +++ b/backend/pkg/web/middleware/server_sent_event.go @@ -41,15 +41,15 @@ func SSEEventBusServerMiddleware(logger *logrus.Entry) gin.HandlerFunc { } // Send new connection to event server - bus.NewListener <- clientListener + bus.NewListener <- &clientListener defer func() { // Send closed connection to event server - bus.ClosedListener <- clientListener + bus.ClosedListener <- &clientListener }() - c.Set(pkg.ContextKeyTypeSSEEventBusServer, bus) - c.Set(pkg.ContextKeyTypeSSEClientChannel, clientListener) + c.Set(pkg.ContextKeyTypeSSEEventBusServer, &bus) + c.Set(pkg.ContextKeyTypeSSEClientChannel, &clientListener) c.Next() } diff --git a/frontend/src/app/services/event-bus.service.ts b/frontend/src/app/services/event-bus.service.ts index 707b6991d..37dde593c 100644 --- a/frontend/src/app/services/event-bus.service.ts +++ b/frontend/src/app/services/event-bus.service.ts @@ -37,6 +37,7 @@ export class EventBusService { this.authService.IsAuthenticatedSubject.subscribe((isAuthenticated) => { console.log("isAuthenticated changed:", isAuthenticated) if(isAuthenticated){ + console.log("Started listening to event bus") this.eventBusSubscription = this.listenEventBus().subscribe((event: Event | EventSourceSync | EventSourceComplete)=>{ console.log("eventbus event:", event) //TODO: start toasts. @@ -48,6 +49,7 @@ export class EventBusService { }) } else { //no longer authenticated, unsubscribe from eventbus and abort/terminate connection + console.log("Stopped listening to event bus") this.abortEventBus() } });