Stream Rxjs Observable values as Server-Sent Events (SSE) using Hapi.
Hapi-rx-sse is a tiny lib which can be used inside a route handler to stream back data as Server Sent Events (SSE).
Any RxJs Observable can be used as a source of events, which allows for interesting composition. The source Observable can encapsulate a Kafka, RabbitMQ, Mongodb oplog consumer... or any other thing which is capable of emitting events.
The composable nature of RxJs allows adding additional operators (map, filter, bufferWithTimeOrCount...) to the source Observable, so that data can be transformed into the expected payload, and can be filtered / enriched in an efficient way.
Check out the Examples section below for some real-world examples.
npm install hapi-rx-sse
Hapi-rx-sse exposes a single stream
function which accepts any RxJs Observable and Hapi's req and reply.
hapiRxSSE.stream(createObservable(), req, reply);
Simply invoke it within a route handler to stream back the Rxjs Observable values as Server-Sent Events.
server.route({
path: '/events/streaming',
method: 'GET',
handler: (req, reply) => {
hapiRxSSE.stream(createObservable(), req, reply);
}
});
function createObservable() {
// ... See the Examples section below for more details on how the Observable can be created
}
The emitted values are expected to be Objects, any of the following optional properties will be mapped to the wire protocol : event
, id
, data
and comment
For example
{
event: 'books.insert',
id: 12345,
data: '{foo: bar}',
comment: 'whatever comments you want to provide'
}
will translate into
event: books.insert\r\n
id: 12345\r\n
data: {foo: bar}\r\n
: whatever comments you want to provide\r\n
\r\n
The examples are located at hapi-rx-sse-examples
- kafka-sse-filter: Defines a route which composes a Kafka Observable ( rx-no-kafka ) with hapi-rx-sse. Supports filtering via query parameters and Last-Event-Id to resume on connection failures.
- ... more coming soon