Skip to content

Commit

Permalink
fix(NODE-6454): use timeoutcontext for state machine execute() cursor…
Browse files Browse the repository at this point in the history
… options (#4291)
  • Loading branch information
baileympearson authored and dariakp committed Nov 6, 2024
1 parent f745b99 commit 5dd8ee5
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 52 deletions.
30 changes: 14 additions & 16 deletions src/client-side-encryption/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
serialize
} from '../bson';
import { type ProxyOptions } from '../cmap/connection';
import { CursorTimeoutContext } from '../cursor/abstract_cursor';
import { getSocks, type SocksLib } from '../deps';
import { MongoOperationTimeoutError } from '../error';
import { type MongoClient, type MongoClientOptions } from '../mongo_client';
Expand Down Expand Up @@ -519,16 +520,16 @@ export class StateMachine {
): Promise<Uint8Array | null> {
const { db } = MongoDBCollectionNamespace.fromString(ns);

const collections = await client
.db(db)
.listCollections(filter, {
promoteLongs: false,
promoteValues: false,
...(timeoutContext?.csotEnabled()
? { timeoutMS: timeoutContext?.remainingTimeMS, timeoutMode: 'cursorLifetime' }
: {})
})
.toArray();
const cursor = client.db(db).listCollections(filter, {
promoteLongs: false,
promoteValues: false,
timeoutContext: timeoutContext && new CursorTimeoutContext(timeoutContext, Symbol())
});

// There is always exactly zero or one matching documents, so this should always exhaust the cursor
// in a single batch. We call `toArray()` just to be safe and ensure that the cursor is always
// exhausted and closed.
const collections = await cursor.toArray();

const info = collections.length > 0 ? serialize(collections[0]) : null;
return info;
Expand Down Expand Up @@ -582,12 +583,9 @@ export class StateMachine {
return client
.db(dbName)
.collection<DataKey>(collectionName, { readConcern: { level: 'majority' } })
.find(
deserialize(filter),
timeoutContext?.csotEnabled()
? { timeoutMS: timeoutContext?.remainingTimeMS, timeoutMode: 'cursorLifetime' }
: {}
)
.find(deserialize(filter), {
timeoutContext: timeoutContext && new CursorTimeoutContext(timeoutContext, Symbol())
})
.toArray();
}
}
5 changes: 4 additions & 1 deletion src/operations/list_collections.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Binary, Document } from '../bson';
import { CursorResponse } from '../cmap/wire_protocol/responses';
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { type CursorTimeoutContext, type CursorTimeoutMode } from '../cursor/abstract_cursor';
import type { Db } from '../db';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
Expand All @@ -19,6 +19,9 @@ export interface ListCollectionsOptions extends Omit<CommandOperationOptions, 'w
batchSize?: number;
/** @internal */
timeoutMode?: CursorTimeoutMode;

/** @internal */
timeoutContext?: CursorTimeoutContext;
}

/** @internal */
Expand Down
68 changes: 67 additions & 1 deletion test/integration/client-side-encryption/driver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type Binary, EJSON, UUID } from 'bson';
import { expect } from 'chai';
import * as crypto from 'crypto';
import * as sinon from 'sinon';
import { setTimeout } from 'timers/promises';

// eslint-disable-next-line @typescript-eslint/no-restricted-imports
import { ClientEncryption } from '../../../src/client-side-encryption/client_encryption';
Expand All @@ -15,7 +16,9 @@ import {
MongoCryptCreateDataKeyError,
MongoCryptCreateEncryptedCollectionError,
MongoOperationTimeoutError,
StateMachine
resolveTimeoutOptions,
StateMachine,
TimeoutContext
} from '../../mongodb';
import {
clearFailPoint,
Expand All @@ -25,6 +28,7 @@ import {
measureDuration,
sleep
} from '../../tools/utils';
import { filterForCommands } from '../shared';

const metadata: MongoDBMetadataUI = {
requires: {
Expand Down Expand Up @@ -950,6 +954,68 @@ describe('CSOT', function () {
}
);

context('when the cursor times out and a killCursors is executed', function () {
let client: MongoClient;
let commands: (CommandStartedEvent & { command: { maxTimeMS?: number } })[] = [];

beforeEach(async function () {
client = this.configuration.newClient({}, { monitorCommands: true });
commands = [];
client.on('commandStarted', filterForCommands('killCursors', commands));

await client.connect();
const docs = Array.from({ length: 1200 }, (_, i) => ({ i }));

await client.db('test').collection('test').insertMany(docs);

await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['getMore'],
blockConnection: true,
blockTimeMS: 2000
}
});
});

afterEach(async function () {
await clearFailPoint(this.configuration);
await client.close();
});

it(
'refreshes timeoutMS to the full timeout',
{
requires: {
...metadata.requires,
topology: '!load-balanced'
}
},
async function () {
const timeoutContext = TimeoutContext.create(
resolveTimeoutOptions(client, { timeoutMS: 1900 })
);

await setTimeout(1500);

const { result: error } = await measureDuration(() =>
stateMachine
.fetchKeys(client, 'test.test', BSON.serialize({}), timeoutContext)
.catch(e => e)
);
expect(error).to.be.instanceOf(MongoOperationTimeoutError);

const [
{
command: { maxTimeMS }
}
] = commands;
expect(maxTimeMS).to.be.greaterThan(1800);
}
);
});

context('when csot is not enabled and fetchKeys() is delayed', function () {
let encryptedClient;

Expand Down
49 changes: 30 additions & 19 deletions test/integration/crud/client_bulk_write.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import {
clearFailPoint,
configureFailPoint,
makeMultiBatchWrite,
makeMultiResponseBatchModelArray
makeMultiResponseBatchModelArray,
mergeTestMetadata
} from '../../tools/utils';
import { filterForCommands } from '../shared';

Expand Down Expand Up @@ -268,7 +269,7 @@ describe('Client Bulk Write', function () {

beforeEach(async function () {
client = this.configuration.newClient({}, { monitorCommands: true, minPoolSize: 5 });
client.on('commandStarted', filterForCommands(['getMore'], commands));
client.on('commandStarted', filterForCommands(['getMore', 'killCursors'], commands));
await client.connect();

await configureFailPoint(this.configuration, {
Expand All @@ -278,25 +279,35 @@ describe('Client Bulk Write', function () {
});
});

it('the bulk write operation times out', metadata, async function () {
const models = await makeMultiResponseBatchModelArray(this.configuration);
const start = now();
const timeoutError = await client
.bulkWrite(models, {
verboseResults: true,
timeoutMS: 1500
})
.catch(e => e);
it(
'the bulk write operation times out',
mergeTestMetadata(metadata, {
requires: {
// this test has timing logic that depends on killCursors being executed, which does
// not happen in load balanced mode
topology: '!load-balanced'
}
}),
async function () {
const models = await makeMultiResponseBatchModelArray(this.configuration);
const start = now();
const timeoutError = await client
.bulkWrite(models, {
verboseResults: true,
timeoutMS: 1500
})
.catch(e => e);

const end = now();
expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError);
const end = now();
expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError);

// DRIVERS-3005 - killCursors causes cursor cleanup to extend past timeoutMS.
// The amount of time killCursors takes is wildly variable and can take up to almost
// 600-700ms sometimes.
expect(end - start).to.be.within(1500, 1500 + 800);
expect(commands).to.have.lengthOf(1);
});
// DRIVERS-3005 - killCursors causes cursor cleanup to extend past timeoutMS.
// The amount of time killCursors takes is wildly variable and can take up to almost
// 600-700ms sometimes.
expect(end - start).to.be.within(1500, 1500 + 800);
expect(commands.map(({ commandName }) => commandName)).to.have.lengthOf(2);
}
);
});

describe('if the cursor encounters an error and a killCursors is sent', function () {
Expand Down
16 changes: 16 additions & 0 deletions test/tools/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,19 @@ export async function measureDuration<T>(f: () => Promise<T>): Promise<{
result
};
}

export function mergeTestMetadata(
metadata: MongoDBMetadataUI,
newMetadata: MongoDBMetadataUI
): MongoDBMetadataUI {
return {
requires: {
...metadata.requires,
...newMetadata.requires
},
sessions: {
...metadata.sessions,
...newMetadata.sessions
}
};
}
37 changes: 22 additions & 15 deletions test/unit/client-side-encryption/state_machine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import {
BSON,
Collection,
CSOTTimeoutContext,
CursorTimeoutContext,
type FindOptions,
Int32,
Long,
MongoClient,
Expand Down Expand Up @@ -484,26 +486,29 @@ describe('StateMachine', function () {
});

context('when StateMachine.fetchKeys() is passed a `CSOTimeoutContext`', function () {
it('collection.find runs with its timeoutMS property set to remainingTimeMS', async function () {
const timeoutContext = new CSOTTimeoutContext({
it('collection.find uses the provided timeout context', async function () {
const context = new CSOTTimeoutContext({
timeoutMS: 500,
serverSelectionTimeoutMS: 30000
});
await sleep(300);

await stateMachine
.fetchKeys(client, 'keyVault', BSON.serialize({ a: 1 }), timeoutContext)
.fetchKeys(client, 'keyVault', BSON.serialize({ a: 1 }), context)
.catch(e => squashError(e));
expect(findSpy.getCalls()[0].args[1].timeoutMS).to.not.be.undefined;
expect(findSpy.getCalls()[0].args[1].timeoutMS).to.be.lessThanOrEqual(205);

const { timeoutContext } = findSpy.getCalls()[0].args[1] as FindOptions;
expect(timeoutContext).to.be.instanceOf(CursorTimeoutContext);
expect(timeoutContext.timeoutContext).to.equal(context);
});
});

context('when StateMachine.fetchKeys() is not passed a `CSOTimeoutContext`', function () {
it('collection.find runs with an undefined timeoutMS property', async function () {
it('a timeoutContext is not provided to the find cursor', async function () {
await stateMachine
.fetchKeys(client, 'keyVault', BSON.serialize({ a: 1 }))
.catch(e => squashError(e));
expect(findSpy.getCalls()[0].args[1].timeoutMS).to.be.undefined;
const { timeoutContext } = findSpy.getCalls()[0].args[1] as FindOptions;
expect(timeoutContext).to.be.undefined;
});
});
});
Expand Down Expand Up @@ -564,29 +569,31 @@ describe('StateMachine', function () {
context(
'when StateMachine.fetchCollectionInfo() is passed a `CSOTimeoutContext`',
function () {
it('listCollections runs with its timeoutMS property set to remainingTimeMS', async function () {
const timeoutContext = new CSOTTimeoutContext({
it('listCollections uses the provided timeoutContext', async function () {
const context = new CSOTTimeoutContext({
timeoutMS: 500,
serverSelectionTimeoutMS: 30000
});
await sleep(300);
await stateMachine
.fetchCollectionInfo(client, 'keyVault', BSON.serialize({ a: 1 }), timeoutContext)
.fetchCollectionInfo(client, 'keyVault', BSON.serialize({ a: 1 }), context)
.catch(e => squashError(e));
expect(listCollectionsSpy.getCalls()[0].args[1].timeoutMS).to.not.be.undefined;
expect(listCollectionsSpy.getCalls()[0].args[1].timeoutMS).to.be.lessThanOrEqual(205);
const [_filter, { timeoutContext }] = listCollectionsSpy.getCalls()[0].args;
expect(timeoutContext).to.exist;
expect(timeoutContext.timeoutContext).to.equal(context);
});
}
);

context(
'when StateMachine.fetchCollectionInfo() is not passed a `CSOTimeoutContext`',
function () {
it('listCollections runs with an undefined timeoutMS property', async function () {
it('no timeoutContext is provided to listCollections', async function () {
await stateMachine
.fetchCollectionInfo(client, 'keyVault', BSON.serialize({ a: 1 }))
.catch(e => squashError(e));
expect(listCollectionsSpy.getCalls()[0].args[1].timeoutMS).to.be.undefined;
const [_filter, { timeoutContext }] = listCollectionsSpy.getCalls()[0].args;
expect(timeoutContext).not.to.exist;
});
}
);
Expand Down

0 comments on commit 5dd8ee5

Please # to comment.