Skip to content

Commit

Permalink
consolidate event bus methods in EventBusService.
Browse files Browse the repository at this point in the history
  • Loading branch information
AnalogJ committed Sep 10, 2023
1 parent 2b0a1e0 commit 8b6c321
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class MedicalSourcesConnectedComponent implements OnInit {
.then(console.log)
}

this.eventBusService.eventBusSourceSyncMessages.subscribe((event) => {
this.eventBusService.SourceSyncMessages.subscribe((event) => {
this.status[event.source_id] = "token"
})

Expand Down
93 changes: 82 additions & 11 deletions frontend/src/app/services/event-bus.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,28 @@ import {ToastService} from './toast.service';
import {Event} from '../models/events/event';
import {EventSourceComplete} from '../models/events/event_source_complete';
import {EventSourceSync} from '../models/events/event_source_sync';
import {GetEndpointAbsolutePath} from '../../lib/utils/endpoint_absolute_path';
import {environment} from '../../environments/environment';
import {fetchEventSource} from '@microsoft/fetch-event-source';

@Injectable({
providedIn: 'root'
})
export class EventBusService {
eventBusSubscription: Subscription | undefined;
eventBusSourceSyncMessages: Subject<EventSourceSync> = new Subject<EventSourceSync>();
eventBusSourceCompleteMessages: Subject<EventSourceComplete> = new Subject<EventSourceComplete>();

//stores a reference to the event stream observable, which we can listen to for events
private eventBus: Observable<Event> | undefined;
//stores a reference to the event bus abort controller, which we can use to abort the event bus connection
private eventBusAbortController: AbortController | undefined;
//stores a reference to the event bus observable subscription, which we can use to unsubscribe from the event bus
private eventBusSubscription: Subscription | undefined;

public SourceSyncMessages: Subject<EventSourceSync> = new Subject<EventSourceSync>();
public SourceCompleteMessages: Subject<EventSourceComplete> = new Subject<EventSourceComplete>();

constructor(
public router: Router,
public authService: AuthService,
public fastenApiService: FastenApiService,
public toastService: ToastService
) {

Expand All @@ -28,21 +37,83 @@ export class EventBusService {
this.authService.IsAuthenticatedSubject.subscribe((isAuthenticated) => {
console.log("isAuthenticated changed:", isAuthenticated)
if(isAuthenticated){
this.eventBusSubscription = this.fastenApiService.listenEventBus().subscribe((event: Event | EventSourceSync | EventSourceComplete)=>{
this.eventBusSubscription = this.listenEventBus().subscribe((event: Event | EventSourceSync | EventSourceComplete)=>{
console.log("eventbus event:", event)
//TODO: start toasts.
if(event.event_type == "source_sync"){
this.eventBusSourceSyncMessages.next(event as EventSourceSync)
this.SourceSyncMessages.next(event as EventSourceSync)
} else if(event.event_type == "source_complete"){
this.eventBusSourceCompleteMessages.next(event as EventSourceComplete)
this.SourceCompleteMessages.next(event as EventSourceComplete)
}
})
} else {
//no longer authenticated, unsubscribe from eventbus
if(this.eventBusSubscription){
this.eventBusSubscription.unsubscribe()
}
//no longer authenticated, unsubscribe from eventbus and abort/terminate connection
this.abortEventBus()
}
});
}

//Make sure we an cancel the event bus connection & subscription, resetting to a clean state.
abortEventBus() {
if(this.eventBusAbortController){
try {
this.eventBusAbortController.abort()
} catch (e) {
console.log("ignoring, error aborting event bus:", e)
}
}
if(this.eventBusSubscription){
try {
this.eventBusSubscription.unsubscribe()
} catch (e) {
console.log("ignoring, error unsubscribing from event bus:", e)
}
}

this.eventBus = null
this.eventBusAbortController = null
this.eventBusSubscription = null
}

//Listen to the event bus, and return an observable that we can subscribe to.
//this method uses the fetch-event-source library, which is a polyfill for the EventSource API (which does not support Authorization Headers)
//
listenEventBus(): Observable<any> {
//this is a singleton, so if we already have an event bus, return it.

if(this.eventBus){
return this.eventBus
}

let serviceThis = this;
let eventStreamUrl = `${GetEndpointAbsolutePath(globalThis.location, environment.fasten_api_endpoint_base)}/secure/events/stream`
this.eventBusAbortController = new AbortController();
this.eventBus = new Observable(observer => {
fetchEventSource(eventStreamUrl, {
method: 'GET',
headers: {
'Authorization': `Bearer ${this.authService.GetAuthToken()}`
},
onmessage(ev) {
observer.next(JSON.parse(ev.data));
},
onerror(event) {
observer.error(event)
//don't retry, just close the stream
observer.complete()
throw new Error('EventBus error: ' + event);
},
onclose(){
// if the server closes the connection unexpectedly, retry:
serviceThis.abortEventBus()
},
signal: this.eventBusAbortController.signal,
}).then(
() => observer.complete(),
error => observer.error(error)
)
});
return this.eventBus
}

}
25 changes: 4 additions & 21 deletions frontend/src/app/services/fasten-api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import { fetchEventSource } from '@microsoft/fetch-event-source';
})
export class FastenApiService {

private _eventBus: Observable<Event>
private _eventBusAbortController: AbortController

constructor(@Inject(HTTP_CLIENT_TOKEN) private _httpClient: HttpClient, private router: Router, private authService: AuthService) {
}

Expand All @@ -55,27 +58,7 @@ export class FastenApiService {
SECURE ENDPOINTS
*/

listenEventBus(): Observable<any> {
let eventStreamUrl = `${GetEndpointAbsolutePath(globalThis.location, environment.fasten_api_endpoint_base)}/secure/events/stream`

return new Observable(observer => {
fetchEventSource(eventStreamUrl, {
method: 'GET',
headers: {
'Authorization': `Bearer ${this.authService.GetAuthToken()}`
},
onmessage(ev) {
observer.next(JSON.parse(ev.data));
},
onerror(event) {
observer.error(event)
},
}).then(
() => observer.complete(),
error => observer.error(error)
)
});
}
//TODO: Any significant API changes here should also be reflected in EventBusService

getDashboards(): Observable<DashboardConfig[]> {
return this._httpClient.get<any>(`${GetEndpointAbsolutePath(globalThis.location, environment.fasten_api_endpoint_base)}/secure/dashboards`, )
Expand Down

0 comments on commit 8b6c321

Please # to comment.