diff --git a/src/client-side-encryption/auto_encrypter.ts b/src/client-side-encryption/auto_encrypter.ts index 18e2b62cc4..caca83e367 100644 --- a/src/client-side-encryption/auto_encrypter.ts +++ b/src/client-side-encryption/auto_encrypter.ts @@ -375,8 +375,8 @@ export class AutoEncrypter { /** * Cleans up the `_mongocryptdClient`, if present. */ - async teardown(force: boolean): Promise { - await this._mongocryptdClient?.close(force); + async close(): Promise { + await this._mongocryptdClient?.close(); } /** diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 71f509481b..00321ec234 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -17,6 +17,7 @@ import { } from '../constants'; import { type AnyError, + MongoClientClosedError, type MongoError, MongoInvalidArgumentError, MongoMissingCredentialsError, @@ -484,11 +485,17 @@ export class ConnectionPool extends TypedEventEmitter { for (const connection of this.checkedOut) { if (connection.generation <= minGeneration) { connection.onError(new PoolClearedOnNetworkError(this)); - this.checkIn(connection); } } } + /** For MongoClient.close() procedures */ + public closeCheckedOutConnections() { + for (const conn of this.checkedOut) { + conn.onError(new MongoClientClosedError()); + } + } + /** Close the pool */ close(): void { if (this.closed) { diff --git a/src/encrypter.ts b/src/encrypter.ts index 3c7bf2aaed..5a627ea67e 100644 --- a/src/encrypter.ts +++ b/src/encrypter.ts @@ -1,11 +1,8 @@ -import { callbackify } from 'util'; - import { AutoEncrypter, type AutoEncryptionOptions } from './client-side-encryption/auto_encrypter'; import { MONGO_CLIENT_EVENTS } from './constants'; import { getMongoDBClientEncryption } from './deps'; import { MongoInvalidArgumentError, MongoMissingDependencyError } from './error'; import { MongoClient, type MongoClientOptions } from './mongo_client'; -import { type Callback } from './utils'; /** @internal */ export interface EncrypterOptions { @@ -98,20 +95,16 @@ export class Encrypter { } } - closeCallback(client: MongoClient, force: boolean, callback: Callback) { - callbackify(this.close.bind(this))(client, force, callback); - } - - async close(client: MongoClient, force: boolean): Promise { + async close(client: MongoClient): Promise { let error; try { - await this.autoEncrypter.teardown(force); + await this.autoEncrypter.close(); } catch (autoEncrypterError) { error = autoEncrypterError; } const internalClient = this.internalClient; if (internalClient != null && client !== internalClient) { - return await internalClient.close(force); + return await internalClient.close(); } if (error != null) { throw error; diff --git a/src/error.ts b/src/error.ts index 31ae5c9c4d..08e4b86d94 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1018,6 +1018,34 @@ export class MongoTopologyClosedError extends MongoAPIError { } } +/** + * An error generated when the MongoClient is closed and async + * operations are interrupted. + * + * @public + * @category Error + */ +export class MongoClientClosedError extends MongoAPIError { + /** + * **Do not use this constructor!** + * + * Meant for internal use only. + * + * @remarks + * This class is only meant to be constructed within the driver. This constructor is + * not subject to semantic versioning compatibility guarantees and may change at any time. + * + * @public + **/ + constructor() { + super('Operation interrupted because client was closed'); + } + + override get name(): string { + return 'MongoClientClosedError'; + } +} + /** @public */ export interface MongoNetworkErrorOptions { /** Indicates the timeout happened before a connection handshake completed */ diff --git a/src/index.ts b/src/index.ts index 476b5affc3..b886504295 100644 --- a/src/index.ts +++ b/src/index.ts @@ -53,6 +53,7 @@ export { MongoClientBulkWriteCursorError, MongoClientBulkWriteError, MongoClientBulkWriteExecutionError, + MongoClientClosedError, MongoCompatibilityError, MongoCursorExhaustedError, MongoCursorInUseError, diff --git a/src/mongo_client.ts b/src/mongo_client.ts index e77afc4026..fe3097c502 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -641,25 +641,27 @@ export class MongoClient extends TypedEventEmitter implements } /** - * Cleans up client-side resources used by the MongoCLient and . This includes: + * Cleans up client-side resources used by the MongoClient. * - * - Closes all open, unused connections (see note). + * This includes: + * + * - Closes in-use connections. + * - Closes all active cursors. * - Ends all in-use sessions with {@link ClientSession#endSession|ClientSession.endSession()}. + * - aborts in progress transactions if is one related to the session. * - Ends all unused sessions server-side. + * - Closes all remaining idle connections. * - Cleans up any resources being used for auto encryption if auto encryption is enabled. * - * @remarks Any in-progress operations are not killed and any connections used by in progress operations - * will be cleaned up lazily as operations finish. - * - * @param force - Force close, emitting no events + * @param _force - currently an unused flag that has no effect. Defaults to `false`. */ - async close(force = false): Promise { + async close(_force = false): Promise { if (this.closeLock) { return await this.closeLock; } try { - this.closeLock = this._close(force); + this.closeLock = this._close(); await this.closeLock; } finally { // release @@ -668,7 +670,7 @@ export class MongoClient extends TypedEventEmitter implements } /* @internal */ - private async _close(force = false): Promise { + private async _close(): Promise { // There's no way to set hasBeenClosed back to false Object.defineProperty(this.s, 'hasBeenClosed', { value: true, @@ -677,6 +679,8 @@ export class MongoClient extends TypedEventEmitter implements writable: false }); + this.topology?.closeCheckedOutConnections(); + const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close()); this.s.activeCursors.clear(); @@ -722,7 +726,7 @@ export class MongoClient extends TypedEventEmitter implements const { encrypter } = this.options; if (encrypter) { - await encrypter.close(this, force); + await encrypter.close(this); } } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index c679831697..4d7052e327 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -246,8 +246,12 @@ export class Server extends TypedEventEmitter { } } + closeCheckedOutConnections() { + return this.pool.closeCheckedOutConnections(); + } + /** Destroy the server connection */ - destroy(): void { + close(): void { if (this.s.state === STATE_CLOSED) { return; } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index a67f17dd9b..4da824d059 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -490,6 +490,12 @@ export class Topology extends TypedEventEmitter { } } + closeCheckedOutConnections() { + for (const server of this.s.servers.values()) { + return server.closeCheckedOutConnections(); + } + } + /** Close this topology */ close(): void { if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) { @@ -497,7 +503,7 @@ export class Topology extends TypedEventEmitter { } for (const server of this.s.servers.values()) { - destroyServer(server, this); + closeServer(server, this); } this.s.servers.clear(); @@ -791,12 +797,12 @@ export class Topology extends TypedEventEmitter { } /** Destroys a server, and removes all event listeners from the instance */ -function destroyServer(server: Server, topology: Topology) { +function closeServer(server: Server, topology: Topology) { for (const event of LOCAL_SERVER_EVENTS) { server.removeAllListeners(event); } - server.destroy(); + server.close(); topology.emitAndLog( Topology.SERVER_CLOSED, new ServerClosedEvent(topology.s.id, server.description.address) @@ -903,7 +909,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes // prepare server for garbage collection if (server) { - destroyServer(server, topology); + closeServer(server, topology); } } } diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index f8aabb8321..8cc7b3c3b1 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -63,6 +63,7 @@ describe('Change Streams', function () { await csDb.createCollection('test').catch(() => null); collection = csDb.collection('test'); changeStream = collection.watch(); + changeStream.on('error', () => null); }); afterEach(async () => { @@ -702,15 +703,19 @@ describe('Change Streams', function () { const outStream = new PassThrough({ objectMode: true }); - // @ts-expect-error: transform requires a Document return type - changeStream.stream({ transform: JSON.stringify }).pipe(outStream); + const transform = doc => ({ doc: JSON.stringify(doc) }); + changeStream + .stream({ transform }) + .on('error', () => null) + .pipe(outStream) + .on('error', () => null); const willBeData = once(outStream, 'data'); await collection.insertMany([{ a: 1 }]); const [data] = await willBeData; - const parsedEvent = JSON.parse(data); + const parsedEvent = JSON.parse(data.doc); expect(parsedEvent).to.have.nested.property('fullDocument.a', 1); outStream.destroy(); diff --git a/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts b/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts index 437961b120..a765f42afe 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_pool.test.ts @@ -3,6 +3,7 @@ import { once } from 'node:events'; import { expect } from 'chai'; import { type ConnectionPoolCreatedEvent, type Db, type MongoClient } from '../../mongodb'; +import { clearFailPoint, configureFailPoint, sleep } from '../../tools/utils'; describe('Connection Pool', function () { let client: MongoClient; @@ -64,5 +65,89 @@ describe('Connection Pool', function () { }); }); }); + + const metadata: MongoDBMetadataUI = { requires: { mongodb: '>=4.4', topology: 'single' } }; + + describe('ConnectionCheckedInEvent', metadata, function () { + let client: MongoClient; + + beforeEach(async function () { + if (!this.configuration.filters.MongoDBVersionFilter.filter({ metadata })) { + return; + } + if (!this.configuration.filters.MongoDBTopologyFilter.filter({ metadata })) { + return; + } + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['insert'], + blockConnection: true, + blockTimeMS: 500 + } + }); + + client = this.configuration.newClient(); + await client.connect(); + await Promise.all(Array.from({ length: 100 }, () => client.db().command({ ping: 1 }))); + }); + + afterEach(async function () { + if (this.configuration.filters.MongoDBVersionFilter.filter({ metadata })) { + await clearFailPoint(this.configuration); + } + await client.close(); + }); + + describe('when a MongoClient is closed', function () { + it( + 'a connection pool emits checked in events for closed connections', + metadata, + async () => { + const allClientEvents = []; + const pushToClientEvents = e => allClientEvents.push(e); + + client + .on('connectionCheckedOut', pushToClientEvents) + .on('connectionCheckedIn', pushToClientEvents) + .on('connectionClosed', pushToClientEvents); + + const inserts = Promise.allSettled([ + client.db('test').collection('test').insertOne({ a: 1 }), + client.db('test').collection('test').insertOne({ a: 1 }), + client.db('test').collection('test').insertOne({ a: 1 }) + ]); + + // wait until all pings are pending on the server + while (allClientEvents.filter(e => e.name === 'connectionCheckedOut').length < 3) { + await sleep(1); + } + + const insertConnectionIds = allClientEvents + .filter(e => e.name === 'connectionCheckedOut') + .map(({ address, connectionId }) => `${address} + ${connectionId}`); + + await client.close(); + + const insertCheckInAndCloses = allClientEvents + .filter(e => e.name === 'connectionCheckedIn' || e.name === 'connectionClosed') + .filter(({ address, connectionId }) => + insertConnectionIds.includes(`${address} + ${connectionId}`) + ); + + expect(insertCheckInAndCloses).to.have.lengthOf(6); + + // check that each check-in is followed by a close (not proceeded by one) + expect(insertCheckInAndCloses.map(e => e.name)).to.deep.equal( + Array.from({ length: 3 }, () => ['connectionCheckedIn', 'connectionClosed']).flat(1) + ); + + await inserts; + } + ); + }); + }); }); }); diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 2eca8a008a..c558ec5797 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -11,7 +11,7 @@ const { Writable } = require('stream'); const { once, on } = require('events'); const { setTimeout } = require('timers'); const { ReadPreference } = require('../../mongodb'); -const { ServerType } = require('../../mongodb'); +const { ServerType, MongoClientClosedError } = require('../../mongodb'); const { formatSort } = require('../../mongodb'); describe('Cursor', function () { @@ -1848,32 +1848,44 @@ describe('Cursor', function () { } }); - it('closes cursors when client is closed even if it has not been exhausted', async function () { - await client - .db() - .dropCollection('test_cleanup_tailable') - .catch(() => null); + it( + 'closes cursors when client is closed even if it has not been exhausted', + { requires: { topology: '!replicaset' } }, + async function () { + await client + .db() + .dropCollection('test_cleanup_tailable') + .catch(() => null); - const collection = await client - .db() - .createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 }); + const collection = await client + .db() + .createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 }); - // insert only 2 docs in capped coll of 3 - await collection.insertMany([{ a: 1 }, { a: 1 }]); + // insert only 2 docs in capped coll of 3 + await collection.insertMany([{ a: 1 }, { a: 1 }]); - const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 }); + const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 }); - await cursor.next(); - await cursor.next(); - // will block for maxAwaitTimeMS (except we are closing the client) - const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error); + await cursor.next(); + await cursor.next(); - await client.close(); - expect(cursor).to.have.property('closed', true); + const nextCommand = once(client, 'commandStarted'); + // will block for maxAwaitTimeMS (except we are closing the client) + const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error); - const error = await rejectedEarlyBecauseClientClosed; - expect(error).to.be.null; // TODO(NODE-6632): This should throw again after the client signal aborts the in-progress next call - }); + for ( + let [{ commandName }] = await nextCommand; + commandName !== 'getMore'; + [{ commandName }] = await once(client, 'commandStarted') + ); + + await client.close(); + expect(cursor).to.have.property('closed', true); + + const error = await rejectedEarlyBecauseClientClosed; + expect(error).to.be.instanceOf(MongoClientClosedError); + } + ); it('shouldAwaitData', { // Add a tag that our runner can trigger on diff --git a/test/integration/index_management.test.ts b/test/integration/index_management.test.ts index b79d8c1804..ee1855fc02 100644 --- a/test/integration/index_management.test.ts +++ b/test/integration/index_management.test.ts @@ -770,20 +770,19 @@ describe('Indexes', function () { expect(events).to.be.an('array').with.lengthOf(1); expect(events[0]).nested.property('command.commitQuorum').to.equal(0); - await collection.drop(err => { - expect(err).to.not.exist; - }); + await collection.drop(); } }; } it( 'should run command with commitQuorum if specified on db.createIndex', - commitQuorumTest((db, collection) => - db.createIndex(collection.collectionName, 'a', { - // @ts-expect-error revaluate this? - writeConcern: { w: 'majority' }, - commitQuorum: 0 - }) + commitQuorumTest( + async (db, collection) => + await db.createIndex(collection.collectionName, 'a', { + // @ts-expect-error revaluate this? + writeConcern: { w: 'majority' }, + commitQuorum: 0 + }) ) ); it( diff --git a/test/integration/node-specific/examples/causal_consistency.test.js b/test/integration/node-specific/examples/causal_consistency.test.js index 41e97323d8..7a4db5eda0 100644 --- a/test/integration/node-specific/examples/causal_consistency.test.js +++ b/test/integration/node-specific/examples/causal_consistency.test.js @@ -31,8 +31,8 @@ describe('examples(causal-consistency):', function () { it('supports causal consistency', async function () { const session = client.startSession({ causalConsistency: true }); - collection.insertOne({ darmok: 'jalad' }, { session }); - collection.updateOne({ darmok: 'jalad' }, { $set: { darmok: 'tanagra' } }, { session }); + await collection.insertOne({ darmok: 'jalad' }, { session }); + await collection.updateOne({ darmok: 'jalad' }, { $set: { darmok: 'tanagra' } }, { session }); const results = await collection.find({}, { session }).toArray(); diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index 9a0cea014b..dd3d012fdb 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -18,7 +18,7 @@ import { ServerDescription, Topology } from '../../mongodb'; -import { runLater } from '../../tools/utils'; +import { clearFailPoint, configureFailPoint, runLater } from '../../tools/utils'; import { setupDatabase } from '../shared'; describe('class MongoClient', function () { @@ -1035,7 +1035,7 @@ describe('class MongoClient', function () { client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev)); }); - it('are all closed', async () => { + it('are all closed', async function () { const cursors = Array.from({ length: 30 }, (_, skip) => collection.find({}, { skip, batchSize: 1 }) ); @@ -1043,7 +1043,7 @@ describe('class MongoClient', function () { expect(client.s.activeCursors).to.have.lengthOf(30); await client.close(); expect(client.s.activeCursors).to.have.lengthOf(0); - expect(kills).to.have.lengthOf(30); + expect(kills).to.have.lengthOf(this.configuration.topologyType === 'LoadBalanced' ? 0 : 30); }); it('creating cursors after close adds to activeCursors', async () => { @@ -1064,6 +1064,63 @@ describe('class MongoClient', function () { expect(client.s.activeCursors).to.have.lengthOf(1); }); }); + + const metadata: MongoDBMetadataUI = { requires: { mongodb: '>=4.4', topology: 'single' } }; + + describe( + 'maxPoolSize is not fully used when running clean up operations', + metadata, + function () { + let client; + + beforeEach(async function () { + if (!this.configuration.filters.MongoDBVersionFilter.filter({ metadata })) { + return; + } + if (!this.configuration.filters.MongoDBTopologyFilter.filter({ metadata })) { + return; + } + + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['insert'], + blockConnection: true, + blockTimeMS: 500 + } + }); + + client = this.configuration.newClient({}, { maxPoolSize: 1, monitorCommands: true }); + }); + + afterEach(async function () { + await clearFailPoint(this.configuration); + await client.close(); + }); + + it( + 'closes in-use connections before running clean up operations avoiding a deadlock', + metadata, + async () => { + const inserted = client + .db('t') + .collection('t') + .insertOne({ a: 1 }) + .catch(error => error); + + await once(client, 'commandStarted'); + + const start = performance.now(); + await client.close(); + await inserted; + const end = performance.now(); + + expect(end - start).to.be.lessThan(100); + } + ); + } + ); }); context('when connecting', function () { diff --git a/test/unit/index.test.ts b/test/unit/index.test.ts index b24639f2c8..653792492c 100644 --- a/test/unit/index.test.ts +++ b/test/unit/index.test.ts @@ -71,6 +71,7 @@ const EXPECTED_EXPORTS = [ 'MongoClientBulkWriteCursorError', 'MongoClientBulkWriteError', 'MongoClientBulkWriteExecutionError', + 'MongoClientClosedError', 'MongoCompatibilityError', 'MongoCryptAzureKMSRequestError', 'MongoCryptCreateDataKeyError',