Skip to content

feat: add MQTT transport messaging #459

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ There you will find Express.js, TypeScript and Websocket examples.
| AMQP Protocol Binding | :x: | :x: |
| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Protocol Binding | :x: | :heavy_check_mark: |
| MQTT Protocol Binding | :x: | :x: |
| MQTT Protocol Binding | :heavy_check_mark: | :x: |
| NATS Protocol Binding | :x: | :x: |

---
Expand All @@ -176,6 +176,9 @@ There you will find Express.js, TypeScript and Websocket examples.
| Kafka Binary | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Structured | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Batch | :heavy_check_mark: | :heavy_check_mark:
| MQTT Binary | :heavy_check_mark: | :heavy_check_mark: |
| MQTT Structured | :heavy_check_mark: | :heavy_check_mark: |

## Community

- There are bi-weekly calls immediately following the [Serverless/CloudEvents
Expand Down
5 changes: 4 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";

import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter";
import {
Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message,
Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message, MQTT, MQTTMessage, MQTTMessageFactory,
Serializer, Deserializer } from "./message";

import CONSTANTS from "./constants";
Expand All @@ -32,6 +32,9 @@ export {
Kafka,
KafkaEvent,
KafkaMessage,
MQTT,
MQTTMessage,
MQTTMessageFactory,
// From transport
TransportFunction,
EmitterFunction,
Expand Down
1 change: 1 addition & 0 deletions src/message/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { CloudEventV1 } from "..";
// reexport the protocol bindings
export * from "./http";
export * from "./kafka";
export * from "./mqtt";

/**
* Binding is an interface for transport protocols to implement,
Expand Down
148 changes: 148 additions & 0 deletions src/message/mqtt/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

import { Binding, Deserializer, CloudEvent, CloudEventV1, CONSTANTS, Message, ValidationError, Headers } from "../..";

export {
MQTT,
MQTTMessage,
MQTTMessageFactory
};

/**
* Extends the base {@linkcode Message} interface to include MQTT attributes, some of which
* are aliases of the {Message} attributes.
*/
interface MQTTMessage<T> extends Message<T> {
/**
* Identifies this message as a PUBLISH packet. MQTTMessages created with
* the `binary` and `structured` Serializers will contain a "Content Type"
* property in the PUBLISH record.
* @see https://github.com/cloudevents/spec/blob/v1.0.1/mqtt-protocol-binding.md#3-mqtt-publish-message-mapping
*/
PUBLISH: Record<string, string | undefined> | undefined
/**
* Alias of {Message#body}
*/
payload: T | undefined,
/**
* Alias of {Message#headers}
*/
"User Properties": Headers | undefined
}

/**
* Binding for MQTT transport support
* @implements @linkcode Binding
*/
const MQTT: Binding = {
binary,
structured,
toEvent: toEvent as Deserializer,
isEvent
};

/**
* Converts a CloudEvent into an MQTTMessage<T> with the event's data as the message payload
* @param {CloudEventV1} event a CloudEvent
* @returns {MQTTMessage<T>} the event serialized as an MQTTMessage<T> with binary encoding
* @implements {Serializer}
*/
function binary<T>(event: CloudEventV1<T>): MQTTMessage<T> {
let properties;
if (event instanceof CloudEvent) {
properties = event.toJSON();
} else {
properties = event;
}
const body = properties.data as T;
delete properties.data;

return MQTTMessageFactory(event.datacontenttype as string, properties, body);
}

/**
* Converts a CloudEvent into an MQTTMessage<T> with the event as the message payload
* @param {CloudEventV1} event a CloudEvent
* @returns {MQTTMessage<T>} the event serialized as an MQTTMessage<T> with structured encoding
* @implements {Serializer}
*/
function structured<T>(event: CloudEventV1<T>): MQTTMessage<T> {
let body;
if (event instanceof CloudEvent) {
body = event.toJSON();
} else {
body = event;
}
return MQTTMessageFactory(CONSTANTS.DEFAULT_CE_CONTENT_TYPE, {}, body) as MQTTMessage<T>;
}

/**
* A helper function to create an MQTTMessage<T> object, with "User Properties" as an alias
* for "headers" and "payload" an alias for body, and a "PUBLISH" record with a "Content Type"
* property.
* @param {string} contentType the "Content Type" attribute on PUBLISH
* @param {Record<string, unknown>} headers the headers and "User Properties"
* @param {T} body the message body/payload
* @returns {MQTTMessage<T>} a message initialized with the provided attributes
*/
function MQTTMessageFactory<T>(contentType: string, headers: Record<string, unknown>, body: T): MQTTMessage<T> {
return {
PUBLISH: {
"Content Type": contentType
},
body,
get payload() {
return this.body as T;
},
headers: headers as Headers,
get "User Properties"() {
return this.headers as any;
}
};
}

/**
* Converts an MQTTMessage<T> into a CloudEvent
* @param {Message<T>} message the message to deserialize
* @param {boolean} strict determines if a ValidationError will be thrown on bad input - defaults to false
* @returns {CloudEventV1<T>} an event
* @implements {Deserializer}
*/
function toEvent<T>(message: Message<T>, strict = false): CloudEventV1<T> | CloudEventV1<T>[] {
if (strict && !isEvent(message)) {
throw new ValidationError("No CloudEvent detected");
}
if (isStructuredMessage(message as MQTTMessage<T>)) {
const evt = (typeof message.body === "string") ? JSON.parse(message.body): message.body;
return new CloudEvent({
...evt as CloudEventV1<T>
}, false);
} else {
return new CloudEvent<T>({
...message.headers,
data: message.body as T,
}, false);
}
}

/**
* Determine if the message is a CloudEvent
* @param {Message<T>} message an MQTTMessage
* @returns {boolean} true if the message contains an event
*/
function isEvent<T>(message: Message<T>): boolean {
return isBinaryMessage(message) || isStructuredMessage(message as MQTTMessage<T>);
}

function isBinaryMessage<T>(message: Message<T>): boolean {
return (!!message.headers.id && !!message.headers.source
&& !! message.headers.type && !!message.headers.specversion);
}

function isStructuredMessage<T>(message: MQTTMessage<T>): boolean {
if (!message) { return false; }
return (message.PUBLISH && message?.PUBLISH["Content Type"]?.startsWith(CONSTANTS.MIME_CE_JSON)) || false;
}
Loading