From fc21c4a05f9d50d7efd62aa7a937fadce385e919 Mon Sep 17 00:00:00 2001 From: Jonathan Perret Date: Thu, 13 Jun 2024 23:02:22 +0200 Subject: [PATCH] fix: fix `websocket` and `webtransport` send callbacks (#699) With the `websocket` transport, the callbacks which indicate that the packets are actually written were not properly called. Example: ```js socket.send("hello", () => { // the message has been written to the underlying transport }); ``` The bug was caused by the `websocket` transport (and `webtransport` as well) having its `supportsFraming` property set to `true`, despite having been changed in [1] to emit a single `drain` event for each batch of messages written to the transport like the `polling` transport always did. Note that although [1] is partially reverted in [2], the new `drain` event behavior is preserved as called out in that commit's message. The `supportsFraming` attribute was introduced in [3] (amended by [4]) as a way to distinguish transports that emit one `drain` per message from those that emit one `drain` per batch. Since the delivery of `send` callbacks depends on matching `drain` events with `transport.send` calls, that distinction is vital to correct behavior. However, now that all transports have converged to "one `drain` per batch" behavior, this `supportsFraming` property can be retired (and the code for calling callbacks simplified). [1]: https://github.com/socketio/engine.io/pull/618 [2]: https://github.com/socketio/engine.io/commit/a65a047526401bebaa113a8c70d03f5d963eaa54 [3]: https://github.com/socketio/engine.io/pull/130 [4]: https://github.com/socketio/engine.io/pull/132 Related: https://github.com/socketio/engine.io/issues/698 --- lib/socket.ts | 38 ++++++++++++--------------------- lib/transport.ts | 5 ----- lib/transports-uws/polling.ts | 4 ---- lib/transports-uws/websocket.ts | 9 -------- lib/transports/polling.ts | 4 ---- lib/transports/websocket.ts | 9 -------- lib/transports/webtransport.ts | 4 ---- test/server.js | 14 ++++++++++-- test/webtransport.mjs | 15 +++++++++++++ 9 files changed, 41 insertions(+), 61 deletions(-) diff --git a/lib/socket.ts b/lib/socket.ts index 59814dee..b8ff48aa 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -14,6 +14,8 @@ export interface SendOptions { type ReadyState = "opening" | "open" | "closing" | "closed"; +type SendCallback = (transport: Transport) => void; + export class Socket extends EventEmitter { public readonly protocol: number; // TODO for the next major release: do not keep the reference to the first HTTP request, as it stays in memory @@ -27,8 +29,8 @@ export class Socket extends EventEmitter { private upgrading = false; private upgraded = false; private writeBuffer: Packet[] = []; - private packetsFn: Array<() => void> = []; - private sentCallbackFn: any[] = []; + private packetsFn: SendCallback[] = []; + private sentCallbackFn: SendCallback[][] = []; private cleanupFn: any[] = []; private pingTimeoutTimer; private pingIntervalTimer; @@ -395,19 +397,11 @@ export class Socket extends EventEmitter { // the message was sent successfully, execute the callback const onDrain = () => { if (this.sentCallbackFn.length > 0) { - const seqFn = this.sentCallbackFn.splice(0, 1)[0]; - if ("function" === typeof seqFn) { - debug("executing send callback"); - seqFn(this.transport); - } else if (Array.isArray(seqFn)) { - debug("executing batch send callback"); - const l = seqFn.length; - let i = 0; - for (; i < l; i++) { - if ("function" === typeof seqFn[i]) { - seqFn[i](this.transport); - } - } + debug("executing batch send callback"); + const seqFn = this.sentCallbackFn.shift(); + const l = seqFn.length; + for (let i = 0; i < l; i++) { + seqFn[i](this.transport); } } }; @@ -428,7 +422,7 @@ export class Socket extends EventEmitter { * @return {Socket} for chaining * @api public */ - public send(data: RawData, options?: SendOptions, callback?: () => void) { + public send(data: RawData, options?: SendOptions, callback?: SendCallback) { this.sendPacket("message", data, options, callback); return this; } @@ -440,7 +434,7 @@ export class Socket extends EventEmitter { * @param options * @param callback */ - public write(data: RawData, options?: SendOptions, callback?: () => void) { + public write(data: RawData, options?: SendOptions, callback?: SendCallback) { this.sendPacket("message", data, options, callback); return this; } @@ -459,7 +453,7 @@ export class Socket extends EventEmitter { type: PacketType, data?: RawData, options: SendOptions = {}, - callback?: () => void + callback?: SendCallback ) { if ("function" === typeof options) { callback = options; @@ -485,7 +479,7 @@ export class Socket extends EventEmitter { this.writeBuffer.push(packet); // add send callback to object, if defined - if (callback) this.packetsFn.push(callback); + if ("function" === typeof callback) this.packetsFn.push(callback); this.flush(); } @@ -507,11 +501,7 @@ export class Socket extends EventEmitter { this.server.emit("flush", this, this.writeBuffer); const wbuf = this.writeBuffer; this.writeBuffer = []; - if (!this.transport.supportsFraming) { - this.sentCallbackFn.push(this.packetsFn); - } else { - this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn); - } + this.sentCallbackFn.push(this.packetsFn); this.packetsFn = []; this.transport.send(wbuf); this.emit("drain"); diff --git a/lib/transport.ts b/lib/transport.ts index b0777bca..6318fb01 100644 --- a/lib/transport.ts +++ b/lib/transport.ts @@ -137,11 +137,6 @@ export abstract class Transport extends EventEmitter { this.emit("close"); } - /** - * Advertise framing support. - */ - abstract get supportsFraming(); - /** * The name of the transport. */ diff --git a/lib/transports-uws/polling.ts b/lib/transports-uws/polling.ts index 16e72c00..3fa30d90 100644 --- a/lib/transports-uws/polling.ts +++ b/lib/transports-uws/polling.ts @@ -42,10 +42,6 @@ export class Polling extends Transport { return "polling"; } - get supportsFraming() { - return false; - } - /** * Overrides onRequest. * diff --git a/lib/transports-uws/websocket.ts b/lib/transports-uws/websocket.ts index 9fc43ce3..4de3c23e 100644 --- a/lib/transports-uws/websocket.ts +++ b/lib/transports-uws/websocket.ts @@ -37,15 +37,6 @@ export class WebSocket extends Transport { return true; } - /** - * Advertise framing support. - * - * @api public - */ - get supportsFraming() { - return true; - } - /** * Writes a packet payload. * diff --git a/lib/transports/polling.ts b/lib/transports/polling.ts index e5ea24cf..598315b7 100644 --- a/lib/transports/polling.ts +++ b/lib/transports/polling.ts @@ -42,10 +42,6 @@ export class Polling extends Transport { return "polling"; } - get supportsFraming() { - return false; - } - /** * Overrides onRequest. * diff --git a/lib/transports/websocket.ts b/lib/transports/websocket.ts index 57c4a7b3..1f98fb7d 100644 --- a/lib/transports/websocket.ts +++ b/lib/transports/websocket.ts @@ -45,15 +45,6 @@ export class WebSocket extends Transport { return true; } - /** - * Advertise framing support. - * - * @api public - */ - get supportsFraming() { - return true; - } - /** * Writes a packet payload. * diff --git a/lib/transports/webtransport.ts b/lib/transports/webtransport.ts index 5922fab0..663a0f66 100644 --- a/lib/transports/webtransport.ts +++ b/lib/transports/webtransport.ts @@ -44,10 +44,6 @@ export class WebTransport extends Transport { return "webtransport"; } - get supportsFraming() { - return true; - } - async send(packets) { this.writable = false; diff --git a/test/server.js b/test/server.js index a373837e..d0301308 100644 --- a/test/server.js +++ b/test/server.js @@ -2759,13 +2759,23 @@ describe("server", () => { }); }); - it("should execute in multipart packet", (done) => { + it("should execute in multipart packet (websocket)", (done) => { const engine = listen((port) => { - const socket = new ClientSocket(`ws://localhost:${port}`); + const socket = new ClientSocket(`ws://localhost:${port}`, { + transports: ["websocket"], + }); let i = 0; let j = 0; engine.on("connection", (conn) => { + conn.send("d", (transport) => { + i++; + }); + + conn.send("c", (transport) => { + i++; + }); + conn.send("b", (transport) => { i++; }); diff --git a/test/webtransport.mjs b/test/webtransport.mjs index 3d54e554..10948ba2 100644 --- a/test/webtransport.mjs +++ b/test/webtransport.mjs @@ -373,6 +373,21 @@ describe("WebTransport", () => { }); }); + it("should invoke send callbacks (server to client)", (done) => { + setup({}, async ({ engine, h3Server, socket, reader }) => { + const messageCount = 4; + let receivedCallbacks = 0; + + for (let i = 0; i < messageCount; i++) { + socket.send("hello", () => { + if (++receivedCallbacks === messageCount) { + success(engine, h3Server, done); + } + }); + } + }); + }); + it("should send some binary data (client to server)", (done) => { setup({}, async ({ engine, h3Server, socket, writer }) => { socket.on("data", (data) => {