Skip to content

Commit 3d3da40

Browse files
authored
feat(NODE-6329): client bulk write happy path (#4206)
1 parent 643a875 commit 3d3da40

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2875
-81
lines changed

Diff for: src/cmap/command_monitoring_events.ts

+16-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ import {
77
LEGACY_HELLO_COMMAND_CAMEL_CASE
88
} from '../constants';
99
import { calculateDurationInMs, deepCopy } from '../utils';
10-
import { OpMsgRequest, type OpQueryRequest, type WriteProtocolMessageType } from './commands';
10+
import {
11+
DocumentSequence,
12+
OpMsgRequest,
13+
type OpQueryRequest,
14+
type WriteProtocolMessageType
15+
} from './commands';
1116
import type { Connection } from './connection';
1217

1318
/**
@@ -249,7 +254,16 @@ const OP_QUERY_KEYS = [
249254
/** Extract the actual command from the query, possibly up-converting if it's a legacy format */
250255
function extractCommand(command: WriteProtocolMessageType): Document {
251256
if (command instanceof OpMsgRequest) {
252-
return deepCopy(command.command);
257+
const cmd = deepCopy(command.command);
258+
// For OP_MSG with payload type 1 we need to pull the documents
259+
// array out of the document sequence for monitoring.
260+
if (cmd.ops instanceof DocumentSequence) {
261+
cmd.ops = cmd.ops.documents;
262+
}
263+
if (cmd.nsInfo instanceof DocumentSequence) {
264+
cmd.nsInfo = cmd.nsInfo.documents;
265+
}
266+
return cmd;
253267
}
254268

255269
if (command.query?.$query) {

Diff for: src/cmap/commands.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -544,10 +544,10 @@ export class OpMsgRequest {
544544
for (const [key, value] of Object.entries(document)) {
545545
if (value instanceof DocumentSequence) {
546546
// Document sequences starts with type 1 at the first byte.
547-
const buffer = Buffer.allocUnsafe(1 + 4 + key.length);
547+
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
548548
buffer[0] = 1;
549-
// Third part is the field name at offset 5.
550-
encodeUTF8Into(buffer, key, 5);
549+
// Third part is the field name at offset 5 with trailing null byte.
550+
encodeUTF8Into(buffer, `${key}\0`, 5);
551551
chunks.push(buffer);
552552
// Fourth part are the documents' bytes.
553553
let docsLength = 0;
@@ -557,7 +557,7 @@ export class OpMsgRequest {
557557
chunks.push(docBson);
558558
}
559559
// Second part of the sequence is the length at offset 1;
560-
buffer.writeInt32LE(key.length + docsLength, 1);
560+
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
561561
// Why are we removing the field from the command? This is because it needs to be
562562
// removed in the OP_MSG request first section, and DocumentSequence is not a
563563
// BSON type and is specific to the MongoDB wire protocol so there's nothing

Diff for: src/cmap/wire_protocol/responses.ts

+26
Original file line numberDiff line numberDiff line change
@@ -329,3 +329,29 @@ export class ExplainedCursorResponse extends CursorResponse {
329329
return this.toObject(options);
330330
}
331331
}
332+
333+
/**
334+
* Client bulk writes have some extra metadata at the top level that needs to be
335+
* included in the result returned to the user.
336+
*/
337+
export class ClientBulkWriteCursorResponse extends CursorResponse {
338+
get insertedCount() {
339+
return this.get('nInserted', BSONType.int, true);
340+
}
341+
342+
get upsertedCount() {
343+
return this.get('nUpserted', BSONType.int, true);
344+
}
345+
346+
get matchedCount() {
347+
return this.get('nMatched', BSONType.int, true);
348+
}
349+
350+
get modifiedCount() {
351+
return this.get('nModified', BSONType.int, true);
352+
}
353+
354+
get deletedCount() {
355+
return this.get('nDeleted', BSONType.int, true);
356+
}
357+
}

Diff for: src/cursor/client_bulk_write_cursor.ts

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import type { Document } from '../bson';
2+
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
3+
import { MongoBulkWriteCursorError } from '../error';
4+
import type { MongoClient } from '../mongo_client';
5+
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
6+
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
7+
import { executeOperation } from '../operations/execute_operation';
8+
import type { ClientSession } from '../sessions';
9+
import { mergeOptions, MongoDBNamespace } from '../utils';
10+
import {
11+
AbstractCursor,
12+
type AbstractCursorOptions,
13+
type InitialCursorResponse
14+
} from './abstract_cursor';
15+
16+
/** @public */
17+
export interface ClientBulkWriteCursorOptions
18+
extends Omit<AbstractCursorOptions, 'maxAwaitTimeMS' | 'tailable' | 'awaitData'>,
19+
ClientBulkWriteOptions {}
20+
21+
/**
22+
* This is the cursor that handles client bulk write operations. Note this is never
23+
* exposed directly to the user and is always immediately exhausted.
24+
* @internal
25+
*/
26+
export class ClientBulkWriteCursor extends AbstractCursor {
27+
public readonly command: Document;
28+
/** @internal */
29+
private cursorResponse?: ClientBulkWriteCursorResponse;
30+
/** @internal */
31+
private clientBulkWriteOptions: ClientBulkWriteOptions;
32+
33+
/** @internal */
34+
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
35+
super(client, new MongoDBNamespace('admin', '$cmd'), options);
36+
37+
this.command = command;
38+
this.clientBulkWriteOptions = options;
39+
}
40+
41+
/**
42+
* We need a way to get the top level cursor response fields for
43+
* generating the bulk write result, so we expose this here.
44+
*/
45+
get response(): ClientBulkWriteCursorResponse {
46+
if (this.cursorResponse) return this.cursorResponse;
47+
throw new MongoBulkWriteCursorError(
48+
'No client bulk write cursor response returned from the server.'
49+
);
50+
}
51+
52+
clone(): ClientBulkWriteCursor {
53+
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
54+
delete clonedOptions.session;
55+
return new ClientBulkWriteCursor(this.client, this.command, {
56+
...clonedOptions
57+
});
58+
}
59+
60+
/** @internal */
61+
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
62+
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
63+
...this.clientBulkWriteOptions,
64+
...this.cursorOptions,
65+
session
66+
});
67+
68+
const response = await executeOperation(this.client, clientBulkWriteOperation);
69+
this.cursorResponse = response;
70+
71+
return { server: clientBulkWriteOperation.server, session, response };
72+
}
73+
}

Diff for: src/error.ts

+27
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,33 @@ export class MongoGCPError extends MongoOIDCError {
616616
}
617617
}
618618

619+
/**
620+
* An error indicating that an error occurred when processing bulk write results.
621+
*
622+
* @public
623+
* @category Error
624+
*/
625+
export class MongoBulkWriteCursorError extends MongoRuntimeError {
626+
/**
627+
* **Do not use this constructor!**
628+
*
629+
* Meant for internal use only.
630+
*
631+
* @remarks
632+
* This class is only meant to be constructed within the driver. This constructor is
633+
* not subject to semantic versioning compatibility guarantees and may change at any time.
634+
*
635+
* @public
636+
**/
637+
constructor(message: string) {
638+
super(message);
639+
}
640+
641+
override get name(): string {
642+
return 'MongoBulkWriteCursorError';
643+
}
644+
}
645+
619646
/**
620647
* An error generated when a ChangeStream operation fails to execute.
621648
*

Diff for: src/index.ts

+16
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export {
4444
MongoAWSError,
4545
MongoAzureError,
4646
MongoBatchReExecutionError,
47+
MongoBulkWriteCursorError,
4748
MongoChangeStreamError,
4849
MongoCompatibilityError,
4950
MongoCursorExhaustedError,
@@ -473,6 +474,21 @@ export type {
473474
AggregateOptions,
474475
DB_AGGREGATE_COLLECTION
475476
} from './operations/aggregate';
477+
export type {
478+
AnyClientBulkWriteModel,
479+
ClientBulkWriteOptions,
480+
ClientBulkWriteResult,
481+
ClientDeleteManyModel,
482+
ClientDeleteOneModel,
483+
ClientDeleteResult,
484+
ClientInsertOneModel,
485+
ClientInsertOneResult,
486+
ClientReplaceOneModel,
487+
ClientUpdateManyModel,
488+
ClientUpdateOneModel,
489+
ClientUpdateResult,
490+
ClientWriteModel
491+
} from './operations/client_bulk_write/common';
476492
export type {
477493
CollationOptions,
478494
CommandOperation,

Diff for: src/mongo_client.ts

+19
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ import {
3030
SeverityLevel
3131
} from './mongo_logger';
3232
import { TypedEventEmitter } from './mongo_types';
33+
import {
34+
type AnyClientBulkWriteModel,
35+
type ClientBulkWriteOptions,
36+
type ClientBulkWriteResult
37+
} from './operations/client_bulk_write/common';
38+
import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor';
3339
import { executeOperation } from './operations/execute_operation';
3440
import { RunAdminCommandOperation } from './operations/run_command';
3541
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
@@ -477,6 +483,19 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
477483
return this.s.bsonOptions;
478484
}
479485

486+
/**
487+
* Executes a client bulk write operation, available on server 8.0+.
488+
* @param models - The client bulk write models.
489+
* @param options - The client bulk write options.
490+
* @returns A ClientBulkWriteResult for acknowledged writes and ok: 1 for unacknowledged writes.
491+
*/
492+
async bulkWrite(
493+
models: AnyClientBulkWriteModel[],
494+
options?: ClientBulkWriteOptions
495+
): Promise<ClientBulkWriteResult | { ok: 1 }> {
496+
return await new ClientBulkWriteExecutor(this, models, options).execute();
497+
}
498+
480499
/**
481500
* Connect to MongoDB using a url
482501
*
+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { type Document } from 'bson';
2+
3+
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
4+
import type { Server } from '../../sdam/server';
5+
import type { ClientSession } from '../../sessions';
6+
import { MongoDBNamespace } from '../../utils';
7+
import { CommandOperation } from '../command';
8+
import { Aspect, defineAspects } from '../operation';
9+
import { type ClientBulkWriteOptions } from './common';
10+
11+
/**
12+
* Executes a single client bulk write operation within a potential batch.
13+
* @internal
14+
*/
15+
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
16+
command: Document;
17+
override options: ClientBulkWriteOptions;
18+
19+
override get commandName() {
20+
return 'bulkWrite' as const;
21+
}
22+
23+
constructor(command: Document, options: ClientBulkWriteOptions) {
24+
super(undefined, options);
25+
this.command = command;
26+
this.options = options;
27+
this.ns = new MongoDBNamespace('admin', '$cmd');
28+
}
29+
30+
/**
31+
* Execute the command. Superclass will handle write concern, etc.
32+
* @param server - The server.
33+
* @param session - The session.
34+
* @returns The response.
35+
*/
36+
override async execute(
37+
server: Server,
38+
session: ClientSession | undefined
39+
): Promise<ClientBulkWriteCursorResponse> {
40+
return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse);
41+
}
42+
}
43+
44+
// Skipping the collation as it goes on the individual ops.
45+
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);

0 commit comments

Comments
 (0)