Skip to content

Commit

Permalink
perf(websocket): use bound callbacks
Browse files Browse the repository at this point in the history
Instead of allocating one temporary function for each WebSocket
`send()` call.

Regarding the test removal, the permessage-deflate threshold was
implemented in the "ws" package in [1], so it's not needed anymore.

[1]: websockets/ws@6b3904b
  • Loading branch information
darrachequesne committed Jun 17, 2024
1 parent 62f59b6 commit 9a68c8c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 105 deletions.
81 changes: 38 additions & 43 deletions lib/transports/websocket.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Transport } from "../transport";
import debugModule from "debug";
import type { Packet, RawData } from "engine.io-parser";

const debug = debugModule("engine:ws");

Expand Down Expand Up @@ -45,52 +46,27 @@ export class WebSocket extends Transport {
return true;
}

/**
* Writes a packet payload.
*
* @param {Array} packets
* @api private
*/
send(packets) {
send(packets: Packet[]) {
this.writable = false;

for (let i = 0; i < packets.length; i++) {
const packet = packets[i];
const isLast = i + 1 === packets.length;

// always creates a new object since ws modifies it
const opts: { compress?: boolean } = {};
if (packet.options) {
opts.compress = packet.options.compress;
}

const onSent = (err) => {
if (err) {
return this.onError("write error", err.stack);
} else if (isLast) {
this.writable = true;
this.emit("drain");
}
};

const send = (data) => {
if (this.perMessageDeflate) {
const len =
"string" === typeof data ? Buffer.byteLength(data) : data.length;
if (len < this.perMessageDeflate.threshold) {
opts.compress = false;
}
}
debug('writing "%s"', data);
this.socket.send(data, opts, onSent);
};

if (this._canSendPreEncodedFrame(packet)) {
// the WebSocket frame was computed with WebSocket.Sender.frame()
// see https://github.com/websockets/ws/issues/617#issuecomment-283002469
this.socket._sender.sendFrame(packet.options.wsPreEncodedFrame, onSent);
this.socket._sender.sendFrame(
// @ts-ignore
packet.options.wsPreEncodedFrame,
isLast ? this._onSentLast : this._onSent
);
} else {
this.parser.encodePacket(packet, this.supportsBinary, send);
this.parser.encodePacket(
packet,
this.supportsBinary,
isLast ? this._doSendLast : this._doSend
);
}
}
}
Expand All @@ -100,20 +76,39 @@ export class WebSocket extends Transport {
* @param packet
* @private
*/
private _canSendPreEncodedFrame(packet) {
private _canSendPreEncodedFrame(packet: Packet) {
return (
!this.perMessageDeflate &&
typeof this.socket?._sender?.sendFrame === "function" &&
// @ts-ignore
packet.options?.wsPreEncodedFrame !== undefined
);
}

/**
* Closes the transport.
*
* @api private
*/
doClose(fn) {
private _doSend = (data: RawData) => {
this.socket.send(data, this._onSent);
};

private _doSendLast = (data: RawData) => {
this.socket.send(data, this._onSentLast);
};

private _onSent = (err?: Error) => {
if (err) {
this.onError("write error", err.stack);
}
};

private _onSentLast = (err?: Error) => {
if (err) {
this.onError("write error", err.stack);
} else {
this.writable = true;
this.emit("drain");
}
};

doClose(fn?: () => void) {
debug("closing");
this.socket.close();
fn && fn();
Expand Down
62 changes: 0 additions & 62 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3337,68 +3337,6 @@ describe("server", () => {
});
});

describe("permessage-deflate", () => {
it("should set threshold", function (done) {
if (process.env.EIO_WS_ENGINE === "uws") {
return this.skip();
}
const engine = listen(
{ transports: ["websocket"], perMessageDeflate: { threshold: 0 } },
(port) => {
engine.on("connection", (conn) => {
const socket = conn.transport.socket;
const send = socket.send;
socket.send = (data, opts, callback) => {
socket.send = send;
socket.send(data, opts, callback);

expect(opts.compress).to.be(true);
conn.close();
done();
};

const buf = Buffer.allocUnsafe(100);
for (let i = 0; i < buf.length; i++) buf[i] = i % 0xff;
conn.send(buf, { compress: true });
});
new ClientSocket(`http://localhost:${port}`, {
transports: ["websocket"],
});
}
);
});

it("should not compress when the byte size is below threshold", function (done) {
if (process.env.EIO_WS_ENGINE === "uws") {
return this.skip();
}
const engine = listen(
{ transports: ["websocket"], perMessageDeflate: true },
(port) => {
engine.on("connection", (conn) => {
const socket = conn.transport.socket;
const send = socket.send;
socket.send = (data, opts, callback) => {
socket.send = send;
socket.send(data, opts, callback);

expect(opts.compress).to.be(false);
conn.close();
done();
};

const buf = Buffer.allocUnsafe(100);
for (let i = 0; i < buf.length; i++) buf[i] = i % 0xff;
conn.send(buf, { compress: true });
});
new ClientSocket(`http://localhost:${port}`, {
transports: ["websocket"],
});
}
);
});
});

describe("extraHeaders", function () {
this.timeout(5000);

Expand Down

0 comments on commit 9a68c8c

Please # to comment.