Skip to content

Commit

Permalink
fix: discard acknowledgements upon disconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Mar 14, 2024
1 parent 8cfea8c commit 5060b82
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 22 deletions.
93 changes: 72 additions & 21 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,33 @@ export class Socket<
private readonly _opts: SocketOptions;

private ids: number = 0;
private acks: object = {};
/**
* A map containing acknowledgement handlers.
*
* The `withError` attribute is used to differentiate handlers that accept an error as first argument:
*
* - `socket.emit("test", (err, value) => { ... })` with `ackTimeout` option
* - `socket.timeout(5000).emit("test", (err, value) => { ... })`
* - `const value = await socket.emitWithAck("test")`
*
* From those that don't:
*
* - `socket.emit("test", (value) => { ... });`
*
* In the first case, the handlers will be called with an error when:
*
* - the timeout is reached
* - the socket gets disconnected
*
* In the second case, the handlers will be simply discarded upon disconnection, since the client will never receive
* an acknowledgement from the server.
*
* @private
*/
private acks: Record<
string,
((...args: any[]) => void) & { withError?: boolean }
> = {};
private flags: Flags = {};
private subs?: Array<VoidFunction>;
private _anyListeners: Array<(...args: any[]) => void>;
Expand Down Expand Up @@ -409,7 +435,7 @@ export class Socket<
const id = this.ids++;
debug("emitting packet with ack id %d", id);

const ack = args.pop() as Function;
const ack = args.pop() as (...args: any[]) => void;
this._registerAckCallback(id, ack);
packet.id = id;
}
Expand Down Expand Up @@ -438,7 +464,7 @@ export class Socket<
/**
* @private
*/
private _registerAckCallback(id: number, ack: Function) {
private _registerAckCallback(id: number, ack: (...args: any[]) => void) {
const timeout = this.flags.timeout ?? this._opts.ackTimeout;
if (timeout === undefined) {
this.acks[id] = ack;
Expand All @@ -458,11 +484,14 @@ export class Socket<
ack.call(this, new Error("operation has timed out"));
}, timeout);

this.acks[id] = (...args) => {
const fn = (...args: any[]) => {
// @ts-ignore
this.io.clearTimeoutFn(timer);
ack.apply(this, [null, ...args]);
ack.apply(this, args);
};
fn.withError = true;

this.acks[id] = fn;
}

/**
Expand All @@ -485,17 +514,12 @@ export class Socket<
ev: Ev,
...args: AllButLast<EventParams<EmitEvents, Ev>>
): Promise<FirstArg<Last<EventParams<EmitEvents, Ev>>>> {
// the timeout flag is optional
const withErr =
this.flags.timeout !== undefined || this._opts.ackTimeout !== undefined;
return new Promise((resolve, reject) => {
args.push((arg1, arg2) => {
if (withErr) {
return arg1 ? reject(arg1) : resolve(arg2);
} else {
return resolve(arg1);
}
});
const fn = (arg1, arg2) => {
return arg1 ? reject(arg1) : resolve(arg2);
};
fn.withError = true;
args.push(fn);
this.emit(ev, ...(args as any[] as EventParams<EmitEvents, Ev>));
});
}
Expand Down Expand Up @@ -647,6 +671,28 @@ export class Socket<
this.connected = false;
delete this.id;
this.emitReserved("disconnect", reason, description);
this._clearAcks();
}

/**
* Clears the acknowledgement handlers upon disconnection, since the client will never receive an acknowledgement from
* the server.
*
* @private
*/
private _clearAcks() {
Object.keys(this.acks).forEach((id) => {
const isBuffered = this.sendBuffer.some(
(packet) => String(packet.id) === id
);
if (!isBuffered) {
// note: handlers that do not accept an error as first argument are ignored here
if (this.acks[id].withError) {
this.acks[id].call(this, new Error("socket has been disconnected"));
}
delete this.acks[id];
}
});
}

/**
Expand Down Expand Up @@ -756,20 +802,25 @@ export class Socket<
}

/**
* Called upon a server acknowlegement.
* Called upon a server acknowledgement.
*
* @param packet
* @private
*/
private onack(packet: Packet): void {
const ack = this.acks[packet.id];
if ("function" === typeof ack) {
debug("calling ack %s with %j", packet.id, packet.data);
ack.apply(this, packet.data);
delete this.acks[packet.id];
} else {
if (typeof ack !== "function") {
debug("bad ack %s", packet.id);
return;
}
delete this.acks[packet.id];
debug("calling ack %s with %j", packet.id, packet.data);
// @ts-ignore FIXME ack is inferred as 'never'
if (ack.withError) {
packet.data.unshift(null);
}
// @ts-ignore
ack.apply(this, packet.data);
}

/**
Expand Down
118 changes: 117 additions & 1 deletion test/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ describe("socket", () => {
});
});

it("should use the default value", () => {
it("should use the default timeout value", () => {
return wrap((done) => {
const socket = io(BASE_URL + "/", {
ackTimeout: 50,
Expand All @@ -663,5 +663,121 @@ describe("socket", () => {
});
});
});

describe("acknowledgement upon disconnection", () => {
it("should not ack upon disconnection (callback)", () => {
return wrap((done) => {
const socket = io(BASE_URL, {
forceNew: true,
});

socket.on("connect", () => {
socket.emit("echo", "a", (_value) => {
done(new Error("should not happen"));
});

socket.disconnect();

setTimeout(() => success(done, socket), 100);
});
});
});

it("should ack with an error upon disconnection (callback & timeout)", () => {
return wrap((done) => {
const socket = io(BASE_URL, {
forceNew: true,
});

socket.on("connect", () => {
socket.timeout(10_000).emit("echo", "a", (err) => {
expect(err).to.be.an(Error);

success(done, socket);
});

socket.disconnect();
});
});
});

it("should ack with an error upon disconnection (callback & ackTimeout)", () => {
return wrap((done) => {
const socket = io(BASE_URL, {
forceNew: true,
ackTimeout: 10_000,
});

socket.on("connect", () => {
socket.emit("echo", "a", (err) => {
expect(err).to.be.an(Error);

success(done, socket);
});

socket.disconnect();
});
});
});

it("should ack with an error upon disconnection (promise)", () => {
return wrap((done) => {
const socket = io(BASE_URL, {
forceNew: true,
});

socket.on("connect", () => {
socket.emitWithAck("echo", "a").catch((err) => {
expect(err).to.be.an(Error);

success(done, socket);
});

socket.disconnect();
});
});
});

it("should ack with an error upon disconnection (promise & timeout)", () => {
return wrap((done) => {
const socket = io(BASE_URL, {
forceNew: true,
});

socket.on("connect", () => {
socket
.timeout(10_000)
.emitWithAck("echo", "a")
.catch((err) => {
expect(err).to.be.an(Error);

success(done, socket);
});

socket.disconnect();
});
});
});

it("should not discard an unsent ack (callback)", () => {
return wrap((done) => {
const socket = io(BASE_URL, {
forceNew: true,
});

socket.once("connect", () => {
socket.disconnect();

socket.emit("echo", "a", (value) => {
expect(value).to.eql("a");

success(done, socket);
});

setTimeout(() => socket.connect(), 100);
});
});
});
});
});
});

0 comments on commit 5060b82

Please # to comment.