Skip to content

Commit

Permalink
feat: add support for catch-all listeners for outgoing packets
Browse files Browse the repository at this point in the history
This is similar to `onAny()`, but for outgoing packets.

Syntax:

```js
socket.onAnyOutgoing((event, ...args) => {
  console.log(event);
});
```
  • Loading branch information
darrachequesne committed Mar 31, 2022
1 parent 8b20457 commit 531104d
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 7 deletions.
2 changes: 1 addition & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
StrictEventEmitter,
EventNames,
} from "./typed-events";
import { patchAdapter, restoreAdapter, serveFile } from "./uws.js";
import { patchAdapter, restoreAdapter, serveFile } from "./uws";

const debug = debugModule("socket.io:server");

Expand Down
120 changes: 115 additions & 5 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export class Socket<
private fns: Array<(event: Event, next: (err?: Error) => void) => void> = [];
private flags: BroadcastFlags = {};
private _anyListeners?: Array<(...args: any[]) => void>;
private _anyOutgoingListeners?: Array<(...args: any[]) => void>;

/**
* Interface to a `Client` for a given `Namespace`.
Expand Down Expand Up @@ -220,6 +221,7 @@ export class Socket<
const flags = Object.assign({}, this.flags);
this.flags = {};

this.notifyOutgoingListeners(packet);
this.packet(packet, flags);

return true;
Expand Down Expand Up @@ -710,8 +712,8 @@ export class Socket<
}

/**
* Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the
* callback.
* Adds a listener that will be fired when any event is received. The event name is passed as the first argument to
* the callback.
*
* @param listener
* @public
Expand All @@ -723,8 +725,8 @@ export class Socket<
}

/**
* Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the
* callback. The listener is added to the beginning of the listeners array.
* Adds a listener that will be fired when any event is received. The event name is passed as the first argument to
* the callback. The listener is added to the beginning of the listeners array.
*
* @param listener
* @public
Expand All @@ -736,7 +738,7 @@ export class Socket<
}

/**
* Removes the listener that will be fired when any event is emitted.
* Removes the listener that will be fired when any event is received.
*
* @param listener
* @public
Expand Down Expand Up @@ -769,6 +771,114 @@ export class Socket<
return this._anyListeners || [];
}

/**
* Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the
* callback.
*
* @param listener
*
* <pre><code>
*
* socket.onAnyOutgoing((event, ...args) => {
* console.log(event);
* });
*
* </pre></code>
*
* @public
*/
public onAnyOutgoing(listener: (...args: any[]) => void): this {
this._anyOutgoingListeners = this._anyOutgoingListeners || [];
this._anyOutgoingListeners.push(listener);
return this;
}

/**
* Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the
* callback. The listener is added to the beginning of the listeners array.
*
* @param listener
*
* <pre><code>
*
* socket.prependAnyOutgoing((event, ...args) => {
* console.log(event);
* });
*
* </pre></code>
*
* @public
*/
public prependAnyOutgoing(listener: (...args: any[]) => void): this {
this._anyOutgoingListeners = this._anyOutgoingListeners || [];
this._anyOutgoingListeners.unshift(listener);
return this;
}

/**
* Removes the listener that will be fired when any event is emitted.
*
* @param listener
*
* <pre><code>
*
* const handler = (event, ...args) => {
* console.log(event);
* }
*
* socket.onAnyOutgoing(handler);
*
* // then later
* socket.offAnyOutgoing(handler);
*
* </pre></code>
*
* @public
*/
public offAnyOutgoing(listener?: (...args: any[]) => void): this {
if (!this._anyOutgoingListeners) {
return this;
}
if (listener) {
const listeners = this._anyOutgoingListeners;
for (let i = 0; i < listeners.length; i++) {
if (listener === listeners[i]) {
listeners.splice(i, 1);
return this;
}
}
} else {
this._anyOutgoingListeners = [];
}
return this;
}

/**
* Returns an array of listeners that are listening for any event that is specified. This array can be manipulated,
* e.g. to remove listeners.
*
* @public
*/
public listenersAnyOutgoing() {
return this._anyOutgoingListeners || [];
}

/**
* Notify the listeners for each packet sent (emit or broadcast)
*
* @param packet
*
* @private
*/
private notifyOutgoingListeners(packet: Packet) {
if (this._anyOutgoingListeners && this._anyOutgoingListeners.length) {
const listeners = this._anyOutgoingListeners.slice();
for (const listener of listeners) {
listener.apply(this, packet.data);
}
}
}

private newBroadcastOperator(): BroadcastOperator<EmitEvents, SocketData> {
const flags = Object.assign({}, this.flags);
this.flags = {};
Expand Down
96 changes: 95 additions & 1 deletion test/socket.io.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"use strict";

import { Server, Socket, Namespace } from "..";
import { Server, Socket, Namespace } from "../lib";
import { createServer } from "http";
import fs = require("fs");
import { join } from "path";
Expand Down Expand Up @@ -2082,6 +2082,100 @@ describe("socket.io", () => {
});
});
});

describe("onAnyOutgoing", () => {
it("should call listener", (done) => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(() => {
const clientSocket = client(srv, { multiplex: false });

sio.on("connection", (socket) => {
socket.onAnyOutgoing((event, arg1) => {
expect(event).to.be("my-event");
expect(arg1).to.be("123");

success(sio, clientSocket, done);
});

socket.emit("my-event", "123");
});
});
});

it("should call listener when broadcasting", (done) => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(() => {
const clientSocket = client(srv, { multiplex: false });

sio.on("connection", (socket) => {
socket.onAnyOutgoing((event, arg1) => {
expect(event).to.be("my-event");
expect(arg1).to.be("123");

success(sio, clientSocket, done);
});

sio.emit("my-event", "123");
});
});
});

it("should prepend listener", (done) => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(async () => {
const clientSocket = client(srv, { multiplex: false });

const socket = (await waitFor(sio, "connection")) as Socket;

let count = 0;

socket.onAnyOutgoing((event, arg1) => {
expect(count).to.be(2);

success(sio, clientSocket, done);
});

socket.prependAnyOutgoing(() => {
expect(count++).to.be(1);
});

socket.prependAnyOutgoing(() => {
expect(count++).to.be(0);
});

socket.emit("my-event", "123");
});
});

it("should remove listener", (done) => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(() => {
const clientSocket = client(srv, { multiplex: false });

sio.on("connection", (socket) => {
const fail = () => done(new Error("fail"));

socket.onAnyOutgoing(fail);
socket.offAnyOutgoing(fail);
expect(socket.listenersAnyOutgoing.length).to.be(0);

socket.onAnyOutgoing(() => {
success(sio, clientSocket, done);
});

socket.emit("my-event", "123");
});
});
});
});
});

describe("messaging many", () => {
Expand Down

0 comments on commit 531104d

Please # to comment.