Skip to content

Commit 803445f

Browse files
committed
feat: add support for kafka transport
This commit extends the `message` package to include Kafka transport. Additionally, some of the type information has changed across the project to more accurately reflect the type of `Message` (by including `T`). Related: #390 Signed-off-by: Lance Ball <lball@redhat.com>
1 parent 320354f commit 803445f

File tree

12 files changed

+642
-59
lines changed

12 files changed

+642
-59
lines changed

package-lock.json

-13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/message/http/index.ts

+7-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import { JSONParser, MappedParser, Parser, parserByContentType } from "../../par
2525
* @param {CloudEvent} event The event to serialize
2626
* @returns {Message} a Message object with headers and body
2727
*/
28-
export function binary<T>(event: CloudEvent<T>): Message {
28+
export function binary<T>(event: CloudEvent<T>): Message<T> {
2929
const contentType: Headers = { [CONSTANTS.HEADER_CONTENT_TYPE]: CONSTANTS.DEFAULT_CONTENT_TYPE };
3030
const headers: Headers = { ...contentType, ...headersFor(event) };
3131
let body = event.data;
@@ -47,7 +47,7 @@ export function binary<T>(event: CloudEvent<T>): Message {
4747
* @param {CloudEvent} event the CloudEvent to be serialized
4848
* @returns {Message} a Message object with headers and body
4949
*/
50-
export function structured<T>(event: CloudEvent<T>): Message {
50+
export function structured<T>(event: CloudEvent<T>): Message<T> {
5151
if (event.data_base64) {
5252
// The event's data is binary - delete it
5353
event = event.cloneWith({ data: undefined });
@@ -67,7 +67,7 @@ export function structured<T>(event: CloudEvent<T>): Message {
6767
* @param {Message} message an incoming Message object
6868
* @returns {boolean} true if this Message is a CloudEvent
6969
*/
70-
export function isEvent(message: Message): boolean {
70+
export function isEvent<T>(message: Message<T>): boolean {
7171
// TODO: this could probably be optimized
7272
try {
7373
deserialize(message);
@@ -84,7 +84,7 @@ export function isEvent(message: Message): boolean {
8484
* @param {Message} message the incoming message
8585
* @return {CloudEvent} A new {CloudEvent} instance
8686
*/
87-
export function deserialize<T>(message: Message): CloudEvent<T> | CloudEvent<T>[] {
87+
export function deserialize<T>(message: Message<T>): CloudEvent<T> | CloudEvent<T>[] {
8888
const cleanHeaders: Headers = sanitize(message.headers);
8989
const mode: Mode = getMode(cleanHeaders);
9090
const version = getVersion(mode, cleanHeaders, message.body);
@@ -157,7 +157,7 @@ function getVersion(mode: Mode, headers: Headers, body: string | Record<string,
157157
* @returns {CloudEvent} an instance of CloudEvent representing the incoming request
158158
* @throws {ValidationError} of the event does not conform to the spec
159159
*/
160-
function parseBinary<T>(message: Message, version: Version): CloudEvent<T> {
160+
function parseBinary<T>(message: Message<T>, version: Version): CloudEvent<T> {
161161
const headers = { ...message.headers };
162162
let body = message.body;
163163

@@ -208,7 +208,7 @@ function parseBinary<T>(message: Message, version: Version): CloudEvent<T> {
208208
* @returns {CloudEvent} a new CloudEvent instance for the provided headers and payload
209209
* @throws {ValidationError} if the payload and header combination do not conform to the spec
210210
*/
211-
function parseStructured<T>(message: Message, version: Version): CloudEvent<T> {
211+
function parseStructured<T>(message: Message<T>, version: Version): CloudEvent<T> {
212212
const payload = message.body;
213213
const headers = message.headers;
214214

@@ -253,7 +253,7 @@ function parseStructured<T>(message: Message, version: Version): CloudEvent<T> {
253253
return new CloudEvent<T>(eventObj as CloudEventV1<T>, false);
254254
}
255255

256-
function parseBatched<T>(message: Message): CloudEvent<T> | CloudEvent<T>[] {
256+
function parseBatched<T>(message: Message<T>): CloudEvent<T> | CloudEvent<T>[] {
257257
const ret: CloudEvent<T>[] = [];
258258
const events = JSON.parse(message.body as string);
259259
events.forEach((element: CloudEvent) => {

src/message/index.ts

+7-5
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ export interface Headers extends IncomingHttpHeaders {
4141
* @property {@linkcode Headers} `headers` - the headers for the event Message
4242
* @property string `body` - the body of the event Message
4343
*/
44-
export interface Message {
44+
export interface Message<T> {
4545
headers: Headers;
46-
body: string | unknown;
46+
body: T | string | Buffer | unknown;
4747
}
4848

4949
/**
@@ -62,7 +62,7 @@ export enum Mode {
6262
* @interface
6363
*/
6464
export interface Serializer {
65-
<T>(event: CloudEvent<T>): Message;
65+
<T>(event: CloudEvent<T>): Message<T>;
6666
}
6767

6868
/**
@@ -71,7 +71,7 @@ export interface Serializer {
7171
* @interface
7272
*/
7373
export interface Deserializer {
74-
<T>(message: Message): CloudEvent<T> | CloudEvent<T>[];
74+
<T>(message: Message<T>): CloudEvent<T> | CloudEvent<T>[];
7575
}
7676

7777
/**
@@ -80,7 +80,7 @@ export interface Deserializer {
8080
* @interface
8181
*/
8282
export interface Detector {
83-
(message: Message): boolean;
83+
<T>(message: Message<T>): boolean;
8484
}
8585

8686
/**
@@ -93,3 +93,5 @@ export const HTTP: Binding = {
9393
toEvent: deserialize as Deserializer,
9494
isEvent: isEvent as Detector,
9595
};
96+
97+
export * from "./kafka";

src/message/kafka/headers.ts

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
Copyright 2021 The CloudEvents Authors
3+
SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
import { CloudEvent, CONSTANTS, Headers } from "../..";
7+
8+
/**
9+
* The set of CloudEvent headers that may exist on a Kafka message
10+
*/
11+
export const KAFKA_CE_HEADERS: { [key: string]: string } = Object.freeze({
12+
/** corresponds to the CloudEvent#id */
13+
ID: "ce_id",
14+
/** corresponds to the CloudEvent#type */
15+
TYPE: "ce_type",
16+
/** corresponds to the CloudEvent#source */
17+
SOURCE: "ce_source",
18+
/** corresponds to the CloudEvent#specversion */
19+
SPEC_VERSION: "ce_specversion",
20+
/** corresponds to the CloudEvent#time */
21+
TIME: "ce_time",
22+
/** corresponds to the CloudEvent#subject */
23+
SUBJECT: "ce_subject",
24+
/** corresponds to the CloudEvent#datacontenttype */
25+
DATACONTENTTYPE: "ce_datacontenttype",
26+
/** corresponds to the CloudEvent#dataschema */
27+
DATASCHEMA: "ce_dataschema",
28+
} as const);
29+
30+
export const HEADER_MAP: { [key: string]: string } = {
31+
[KAFKA_CE_HEADERS.ID]: "id",
32+
[KAFKA_CE_HEADERS.TYPE]: "type",
33+
[KAFKA_CE_HEADERS.SOURCE]: "source",
34+
[KAFKA_CE_HEADERS.SPEC_VERSION]: "specversion",
35+
[KAFKA_CE_HEADERS.TIME]: "time",
36+
[KAFKA_CE_HEADERS.SUBJECT]: "subject",
37+
[KAFKA_CE_HEADERS.DATACONTENTTYPE]: "datacontenttype",
38+
[KAFKA_CE_HEADERS.DATASCHEMA]: "dataschema"
39+
};
40+
41+
/**
42+
* A conveninece function to convert a CloudEvent into headers
43+
* @param {CloudEvent} event a CloudEvent object
44+
* @returns {Headers} the CloudEvent attribute as Kafka headers
45+
*/
46+
export function headersFor<T>(event: CloudEvent<T>): Headers {
47+
const headers: Headers = {};
48+
49+
Object.getOwnPropertyNames(event).forEach((property) => {
50+
// Ignore the 'data' property
51+
// it becomes the Kafka message's 'value' field
52+
if (property != CONSTANTS.CE_ATTRIBUTES.DATA && property != CONSTANTS.STRUCTURED_ATTRS_1.DATA_BASE64) {
53+
// all CloudEvent property names get prefixed with 'ce_'
54+
// https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#3231-property-names
55+
headers[`ce_${property}`] = event[property] as string;
56+
}
57+
});
58+
59+
return headers;
60+
}

0 commit comments

Comments
 (0)