diff --git a/lib/transports/websocket.ts b/lib/transports/websocket.ts index 83d4de57..e4cc2623 100644 --- a/lib/transports/websocket.ts +++ b/lib/transports/websocket.ts @@ -1,5 +1,6 @@ import { Transport } from "../transport"; import debugModule from "debug"; +import type { Packet, RawData } from "engine.io-parser"; const debug = debugModule("engine:ws"); @@ -45,52 +46,27 @@ export class WebSocket extends Transport { return true; } - /** - * Writes a packet payload. - * - * @param {Array} packets - * @api private - */ - send(packets) { + send(packets: Packet[]) { this.writable = false; for (let i = 0; i < packets.length; i++) { const packet = packets[i]; const isLast = i + 1 === packets.length; - // always creates a new object since ws modifies it - const opts: { compress?: boolean } = {}; - if (packet.options) { - opts.compress = packet.options.compress; - } - - const onSent = (err) => { - if (err) { - return this.onError("write error", err.stack); - } else if (isLast) { - this.writable = true; - this.emit("drain"); - } - }; - - const send = (data) => { - if (this.perMessageDeflate) { - const len = - "string" === typeof data ? Buffer.byteLength(data) : data.length; - if (len < this.perMessageDeflate.threshold) { - opts.compress = false; - } - } - debug('writing "%s"', data); - this.socket.send(data, opts, onSent); - }; - if (this._canSendPreEncodedFrame(packet)) { // the WebSocket frame was computed with WebSocket.Sender.frame() // see https://github.com/websockets/ws/issues/617#issuecomment-283002469 - this.socket._sender.sendFrame(packet.options.wsPreEncodedFrame, onSent); + this.socket._sender.sendFrame( + // @ts-ignore + packet.options.wsPreEncodedFrame, + isLast ? this._onSentLast : this._onSent + ); } else { - this.parser.encodePacket(packet, this.supportsBinary, send); + this.parser.encodePacket( + packet, + this.supportsBinary, + isLast ? this._doSendLast : this._doSend + ); } } } @@ -100,20 +76,39 @@ export class WebSocket extends Transport { * @param packet * @private */ - private _canSendPreEncodedFrame(packet) { + private _canSendPreEncodedFrame(packet: Packet) { return ( !this.perMessageDeflate && typeof this.socket?._sender?.sendFrame === "function" && + // @ts-ignore packet.options?.wsPreEncodedFrame !== undefined ); } - /** - * Closes the transport. - * - * @api private - */ - doClose(fn) { + private _doSend = (data: RawData) => { + this.socket.send(data, this._onSent); + }; + + private _doSendLast = (data: RawData) => { + this.socket.send(data, this._onSentLast); + }; + + private _onSent = (err?: Error) => { + if (err) { + this.onError("write error", err.stack); + } + }; + + private _onSentLast = (err?: Error) => { + if (err) { + this.onError("write error", err.stack); + } else { + this.writable = true; + this.emit("drain"); + } + }; + + doClose(fn?: () => void) { debug("closing"); this.socket.close(); fn && fn(); diff --git a/test/server.js b/test/server.js index cacb6774..c7c914bb 100644 --- a/test/server.js +++ b/test/server.js @@ -3337,68 +3337,6 @@ describe("server", () => { }); }); - describe("permessage-deflate", () => { - it("should set threshold", function (done) { - if (process.env.EIO_WS_ENGINE === "uws") { - return this.skip(); - } - const engine = listen( - { transports: ["websocket"], perMessageDeflate: { threshold: 0 } }, - (port) => { - engine.on("connection", (conn) => { - const socket = conn.transport.socket; - const send = socket.send; - socket.send = (data, opts, callback) => { - socket.send = send; - socket.send(data, opts, callback); - - expect(opts.compress).to.be(true); - conn.close(); - done(); - }; - - const buf = Buffer.allocUnsafe(100); - for (let i = 0; i < buf.length; i++) buf[i] = i % 0xff; - conn.send(buf, { compress: true }); - }); - new ClientSocket(`http://localhost:${port}`, { - transports: ["websocket"], - }); - } - ); - }); - - it("should not compress when the byte size is below threshold", function (done) { - if (process.env.EIO_WS_ENGINE === "uws") { - return this.skip(); - } - const engine = listen( - { transports: ["websocket"], perMessageDeflate: true }, - (port) => { - engine.on("connection", (conn) => { - const socket = conn.transport.socket; - const send = socket.send; - socket.send = (data, opts, callback) => { - socket.send = send; - socket.send(data, opts, callback); - - expect(opts.compress).to.be(false); - conn.close(); - done(); - }; - - const buf = Buffer.allocUnsafe(100); - for (let i = 0; i < buf.length; i++) buf[i] = i % 0xff; - conn.send(buf, { compress: true }); - }); - new ClientSocket(`http://localhost:${port}`, { - transports: ["websocket"], - }); - } - ); - }); - }); - describe("extraHeaders", function () { this.timeout(5000);