Skip to content

Commit c333723

Browse files
nbbeekenW-A-James
andcommitted
feat(NODE-5682): set maxTimeMS on commands and preempt I/O (#4174)
Co-authored-by: Warren James <warren.james@mongodb.com>
1 parent 398066e commit c333723

File tree

16 files changed

+200
-77
lines changed

16 files changed

+200
-77
lines changed

src/admin.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,10 @@ export class Admin {
155155
* @param options - Optional settings for the command
156156
*/
157157
async listDatabases(options?: ListDatabasesOptions): Promise<ListDatabasesResult> {
158-
return await executeOperation(this.s.db.client, new ListDatabasesOperation(this.s.db, options));
158+
return await executeOperation(
159+
this.s.db.client,
160+
new ListDatabasesOperation(this.s.db, { timeoutMS: this.s.db.timeoutMS, ...options })
161+
);
159162
}
160163

161164
/**

src/cmap/connection.ts

+58-8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
MongoMissingDependencyError,
2020
MongoNetworkError,
2121
MongoNetworkTimeoutError,
22+
MongoOperationTimeoutError,
2223
MongoParseError,
2324
MongoServerError,
2425
MongoUnexpectedServerResponseError
@@ -30,7 +31,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
3031
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3132
import { ServerType } from '../sdam/common';
3233
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
33-
import { type TimeoutContext } from '../timeout';
34+
import { type TimeoutContext, TimeoutError } from '../timeout';
3435
import {
3536
BufferPool,
3637
calculateDurationInMs,
@@ -419,6 +420,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
419420
...options
420421
};
421422

423+
if (options.timeoutContext?.csotEnabled()) {
424+
const { maxTimeMS } = options.timeoutContext;
425+
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
426+
}
427+
422428
const message = this.supportsOpMsg
423429
? new OpMsgRequest(db, cmd, commandOptions)
424430
: new OpQueryRequest(db, cmd, commandOptions);
@@ -433,7 +439,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
433439
): AsyncGenerator<MongoDBResponse> {
434440
this.throwIfAborted();
435441

436-
if (typeof options.socketTimeoutMS === 'number') {
442+
if (options.timeoutContext?.csotEnabled()) {
443+
this.socket.setTimeout(0);
444+
} else if (typeof options.socketTimeoutMS === 'number') {
437445
this.socket.setTimeout(options.socketTimeoutMS);
438446
} else if (this.socketTimeoutMS !== 0) {
439447
this.socket.setTimeout(this.socketTimeoutMS);
@@ -442,7 +450,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
442450
try {
443451
await this.writeCommand(message, {
444452
agreedCompressor: this.description.compressor ?? 'none',
445-
zlibCompressionLevel: this.description.zlibCompressionLevel
453+
zlibCompressionLevel: this.description.zlibCompressionLevel,
454+
timeoutContext: options.timeoutContext
446455
});
447456

448457
if (options.noResponse || message.moreToCome) {
@@ -452,7 +461,17 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
452461

453462
this.throwIfAborted();
454463

455-
for await (const response of this.readMany()) {
464+
if (
465+
options.timeoutContext?.csotEnabled() &&
466+
options.timeoutContext.minRoundTripTime != null &&
467+
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
468+
) {
469+
throw new MongoOperationTimeoutError(
470+
'Server roundtrip time is greater than the time remaining'
471+
);
472+
}
473+
474+
for await (const response of this.readMany({ timeoutContext: options.timeoutContext })) {
456475
this.socket.setTimeout(0);
457476
const bson = response.parse();
458477

@@ -629,7 +648,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
629648
*/
630649
private async writeCommand(
631650
command: WriteProtocolMessageType,
632-
options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number }
651+
options: {
652+
agreedCompressor?: CompressorName;
653+
zlibCompressionLevel?: number;
654+
timeoutContext?: TimeoutContext;
655+
}
633656
): Promise<void> {
634657
const finalCommand =
635658
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
@@ -641,8 +664,32 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
641664

642665
const buffer = Buffer.concat(await finalCommand.toBin());
643666

667+
if (options.timeoutContext?.csotEnabled()) {
668+
if (
669+
options.timeoutContext.minRoundTripTime != null &&
670+
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
671+
) {
672+
throw new MongoOperationTimeoutError(
673+
'Server roundtrip time is greater than the time remaining'
674+
);
675+
}
676+
}
677+
644678
if (this.socket.write(buffer)) return;
645-
return await once(this.socket, 'drain');
679+
680+
const drainEvent = once<void>(this.socket, 'drain');
681+
const timeout = options?.timeoutContext?.timeoutForSocketWrite;
682+
if (timeout) {
683+
try {
684+
return await Promise.race([drainEvent, timeout]);
685+
} catch (error) {
686+
if (TimeoutError.is(error)) {
687+
throw new MongoOperationTimeoutError('Timed out at socket write');
688+
}
689+
throw error;
690+
}
691+
}
692+
return await drainEvent;
646693
}
647694

648695
/**
@@ -654,10 +701,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
654701
*
655702
* Note that `for-await` loops call `return` automatically when the loop is exited.
656703
*/
657-
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
704+
private async *readMany(options: {
705+
timeoutContext?: TimeoutContext;
706+
}): AsyncGenerator<OpMsgResponse | OpReply> {
658707
try {
659-
this.dataEvents = onData(this.messageStream);
708+
this.dataEvents = onData(this.messageStream, options);
660709
this.messageStream.resume();
710+
661711
for await (const message of this.dataEvents) {
662712
const response = await decompressResponse(message);
663713
yield response;

src/cmap/wire_protocol/on_data.ts

+14-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { type EventEmitter } from 'events';
22

3+
import { MongoOperationTimeoutError } from '../../error';
4+
import { type TimeoutContext, TimeoutError } from '../../timeout';
35
import { List, promiseWithResolvers } from '../../utils';
46

57
/**
@@ -18,7 +20,10 @@ type PendingPromises = Omit<
1820
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
1921
* It will reject upon an error event.
2022
*/
21-
export function onData(emitter: EventEmitter) {
23+
export function onData(
24+
emitter: EventEmitter,
25+
{ timeoutContext }: { timeoutContext?: TimeoutContext }
26+
) {
2227
// Setup pending events and pending promise lists
2328
/**
2429
* When the caller has not yet called .next(), we store the
@@ -86,6 +91,8 @@ export function onData(emitter: EventEmitter) {
8691
// Adding event handlers
8792
emitter.on('data', eventHandler);
8893
emitter.on('error', errorHandler);
94+
// eslint-disable-next-line github/no-then
95+
timeoutContext?.timeoutForSocketRead?.then(undefined, errorHandler);
8996

9097
return iterator;
9198

@@ -97,8 +104,12 @@ export function onData(emitter: EventEmitter) {
97104

98105
function errorHandler(err: Error) {
99106
const promise = unconsumedPromises.shift();
100-
if (promise != null) promise.reject(err);
101-
else error = err;
107+
const timeoutError = TimeoutError.is(err)
108+
? new MongoOperationTimeoutError('Timed out during socket read')
109+
: undefined;
110+
111+
if (promise != null) promise.reject(timeoutError ?? err);
112+
else error = timeoutError ?? err;
102113
void closeHandler();
103114
}
104115

src/db.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ export class Db {
277277
this.client,
278278
new RunCommandOperation(this, command, {
279279
...resolveBSONOptions(options),
280-
timeoutMS: options?.timeoutMS,
280+
timeoutMS: options?.timeoutMS ?? this.timeoutMS,
281281
session: options?.session,
282282
readPreference: options?.readPreference
283283
})

src/sdam/topology.ts

+10-7
Original file line numberDiff line numberDiff line change
@@ -460,29 +460,28 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
460460
}
461461
}
462462

463-
const timeoutMS = this.client.s.options.timeoutMS;
463+
// TODO(NODE-6223): auto connect cannot use timeoutMS
464+
// const timeoutMS = this.client.s.options.timeoutMS;
464465
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
465466
const readPreference = options.readPreference ?? ReadPreference.primary;
466-
467467
const timeoutContext = TimeoutContext.create({
468-
timeoutMS,
468+
timeoutMS: undefined,
469469
serverSelectionTimeoutMS,
470470
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
471471
});
472-
473472
const selectServerOptions = {
474473
operationName: 'ping',
475474
...options,
476475
timeoutContext
477476
};
477+
478478
try {
479479
const server = await this.selectServer(
480480
readPreferenceServerSelector(readPreference),
481481
selectServerOptions
482482
);
483-
484483
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
485-
if (!skipPingOnConnect && server && this.s.credentials) {
484+
if (!skipPingOnConnect && this.s.credentials) {
486485
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext });
487486
stateTransition(this, STATE_CONNECTED);
488487
this.emit(Topology.OPEN, this);
@@ -623,7 +622,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
623622

624623
try {
625624
timeout?.throwIfExpired();
626-
return await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
625+
const server = await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
626+
if (options.timeoutContext?.csotEnabled() && server.description.minRoundTripTime !== 0) {
627+
options.timeoutContext.minRoundTripTime = server.description.minRoundTripTime;
628+
}
629+
return server;
627630
} catch (error) {
628631
if (TimeoutError.is(error)) {
629632
// Timeout

src/timeout.ts

+36-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { clearTimeout, setTimeout } from 'timers';
22

3-
import { MongoInvalidArgumentError, MongoRuntimeError } from './error';
3+
import { MongoInvalidArgumentError, MongoOperationTimeoutError, MongoRuntimeError } from './error';
44
import { csotMin, noop } from './utils';
55

66
/** @internal */
@@ -51,7 +51,7 @@ export class Timeout extends Promise<never> {
5151
}
5252

5353
/** Create a new timeout that expires in `duration` ms */
54-
private constructor(executor: Executor = () => null, duration: number, unref = false) {
54+
private constructor(executor: Executor = () => null, duration: number, unref = true) {
5555
let reject!: Reject;
5656

5757
if (duration < 0) {
@@ -163,6 +163,10 @@ export abstract class TimeoutContext {
163163

164164
abstract get clearConnectionCheckoutTimeout(): boolean;
165165

166+
abstract get timeoutForSocketWrite(): Timeout | null;
167+
168+
abstract get timeoutForSocketRead(): Timeout | null;
169+
166170
abstract csotEnabled(): this is CSOTTimeoutContext;
167171
}
168172

@@ -175,13 +179,15 @@ export class CSOTTimeoutContext extends TimeoutContext {
175179
clearConnectionCheckoutTimeout: boolean;
176180
clearServerSelectionTimeout: boolean;
177181

178-
private _maxTimeMS?: number;
179-
180182
private _serverSelectionTimeout?: Timeout | null;
181183
private _connectionCheckoutTimeout?: Timeout | null;
184+
public minRoundTripTime = 0;
185+
private start: number;
182186

183187
constructor(options: CSOTTimeoutContextOptions) {
184188
super();
189+
this.start = Math.trunc(performance.now());
190+
185191
this.timeoutMS = options.timeoutMS;
186192

187193
this.serverSelectionTimeoutMS = options.serverSelectionTimeoutMS;
@@ -193,11 +199,12 @@ export class CSOTTimeoutContext extends TimeoutContext {
193199
}
194200

195201
get maxTimeMS(): number {
196-
return this._maxTimeMS ?? -1;
202+
return this.remainingTimeMS - this.minRoundTripTime;
197203
}
198204

199-
set maxTimeMS(v: number) {
200-
this._maxTimeMS = v;
205+
get remainingTimeMS() {
206+
const timePassed = Math.trunc(performance.now()) - this.start;
207+
return this.timeoutMS <= 0 ? Infinity : this.timeoutMS - timePassed;
201208
}
202209

203210
csotEnabled(): this is CSOTTimeoutContext {
@@ -238,6 +245,20 @@ export class CSOTTimeoutContext extends TimeoutContext {
238245
}
239246
return this._connectionCheckoutTimeout;
240247
}
248+
249+
get timeoutForSocketWrite(): Timeout | null {
250+
const { remainingTimeMS } = this;
251+
if (!Number.isFinite(remainingTimeMS)) return null;
252+
if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS);
253+
throw new MongoOperationTimeoutError('Timed out before socket write');
254+
}
255+
256+
get timeoutForSocketRead(): Timeout | null {
257+
const { remainingTimeMS } = this;
258+
if (!Number.isFinite(remainingTimeMS)) return null;
259+
if (remainingTimeMS > 0) return Timeout.expires(remainingTimeMS);
260+
throw new MongoOperationTimeoutError('Timed out before socket read');
261+
}
241262
}
242263

243264
/** @internal */
@@ -268,4 +289,12 @@ export class LegacyTimeoutContext extends TimeoutContext {
268289
return Timeout.expires(this.options.waitQueueTimeoutMS);
269290
return null;
270291
}
292+
293+
get timeoutForSocketWrite(): Timeout | null {
294+
return null;
295+
}
296+
297+
get timeoutForSocketRead(): Timeout | null {
298+
return null;
299+
}
271300
}

test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts

+12-8
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ describe('CSOT spec prose tests', function () {
384384
clock.restore();
385385
});
386386

387-
it('serverSelectionTimeoutMS honored if timeoutMS is not set', async function () {
387+
it.skip('serverSelectionTimeoutMS honored if timeoutMS is not set', async function () {
388388
/**
389389
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?serverSelectionTimeoutMS=10`.
390390
* 1. Using `client`, execute the command `{ ping: 1 }` against the `admin` database.
@@ -416,10 +416,11 @@ describe('CSOT spec prose tests', function () {
416416

417417
await clock.tickAsync(11);
418418
expect(await maybeError).to.be.instanceof(MongoServerSelectionError);
419-
});
419+
}).skipReason =
420+
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';
420421
});
421422

422-
it("timeoutMS honored for server selection if it's lower than serverSelectionTimeoutMS", async function () {
423+
it.skip("timeoutMS honored for server selection if it's lower than serverSelectionTimeoutMS", async function () {
423424
/**
424425
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=10&serverSelectionTimeoutMS=20`.
425426
* 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database.
@@ -440,9 +441,10 @@ describe('CSOT spec prose tests', function () {
440441

441442
expect(maybeError).to.be.instanceof(MongoOperationTimeoutError);
442443
expect(end - start).to.be.lte(15);
443-
});
444+
}).skipReason =
445+
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';
444446

445-
it("serverSelectionTimeoutMS honored for server selection if it's lower than timeoutMS", async function () {
447+
it.skip("serverSelectionTimeoutMS honored for server selection if it's lower than timeoutMS", async function () {
446448
/**
447449
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=20&serverSelectionTimeoutMS=10`.
448450
* 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database.
@@ -462,9 +464,10 @@ describe('CSOT spec prose tests', function () {
462464

463465
expect(maybeError).to.be.instanceof(MongoOperationTimeoutError);
464466
expect(end - start).to.be.lte(15);
465-
});
467+
}).skipReason =
468+
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';
466469

467-
it('serverSelectionTimeoutMS honored for server selection if timeoutMS=0', async function () {
470+
it.skip('serverSelectionTimeoutMS honored for server selection if timeoutMS=0', async function () {
468471
/**
469472
* 1. Create a MongoClient (referred to as `client`) with URI `mongodb://invalid/?timeoutMS=0&serverSelectionTimeoutMS=10`.
470473
* 1. Using `client`, run the command `{ ping: 1 }` against the `admin` database.
@@ -484,7 +487,8 @@ describe('CSOT spec prose tests', function () {
484487

485488
expect(maybeError).to.be.instanceof(MongoOperationTimeoutError);
486489
expect(end - start).to.be.lte(15);
487-
});
490+
}).skipReason =
491+
'TODO(NODE-6223): Auto connect performs extra server selection. Explicit connect throws on invalid host name';
488492

489493
it.skip("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", async function () {
490494
/**

0 commit comments

Comments
 (0)