Skip to content

Commit 054838f

Browse files
authored
refactor!: pull CursorStream out of Cursor
Cursors are no longer Readable streams, Cursor.stream() must now be called to get a Readable stream. NODE-2820
1 parent a3408e6 commit 054838f

14 files changed

+283
-249
lines changed

src/change_stream.ts

+66-117
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import Denque = require('denque');
22
import { EventEmitter } from 'events';
33
import { MongoError, AnyError, isResumableError } from './error';
4-
import { Cursor } from './cursor';
4+
import { Cursor, CursorOptions, CursorStream } from './cursor/cursor';
55
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
66
import { loadCollection, loadDb, loadMongoClient } from './dynamic_loaders';
77
import {
@@ -13,15 +13,14 @@ import {
1313
MongoDBNamespace,
1414
Callback
1515
} from './utils';
16-
import type { CursorOptions } from './cursor/cursor';
1716
import type { ReadPreference } from './read_preference';
1817
import type { Timestamp, Document } from './bson';
1918
import type { Topology } from './sdam/topology';
20-
import type { Writable } from 'stream';
21-
import type { StreamOptions } from './cursor/core_cursor';
2219
import type { OperationParent } from './operations/command';
2320
import type { CollationOptions } from './cmap/wire_protocol/write_command';
21+
import type { CursorStreamOptions } from './cursor/core_cursor';
2422
const kResumeQueue = Symbol('resumeQueue');
23+
const kCursorStream = Symbol('cursorStream');
2524

2625
const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
2726
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
@@ -34,6 +33,11 @@ const CHANGE_DOMAIN_TYPES = {
3433
CLUSTER: Symbol('Cluster')
3534
};
3635

36+
const NO_RESUME_TOKEN_ERROR = new MongoError(
37+
'A change stream document has been received that lacks a resume token (_id).'
38+
);
39+
const CHANGESTREAM_CLOSED_ERROR = new MongoError('ChangeStream is closed');
40+
3741
/** @public */
3842
export interface ResumeOptions {
3943
startAtOperationTime?: Timestamp;
@@ -155,6 +159,12 @@ interface UpdateDescription {
155159
removedFields: string[];
156160
}
157161

162+
export class ChangeStreamStream extends CursorStream {
163+
constructor(cursor: ChangeStreamCursor) {
164+
super(cursor);
165+
}
166+
}
167+
158168
/**
159169
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
160170
* @public
@@ -168,9 +178,9 @@ export class ChangeStream extends EventEmitter {
168178
topology: Topology;
169179
cursor?: ChangeStreamCursor;
170180
closed: boolean;
171-
pipeDestinations: Writable[] = [];
172-
streamOptions?: StreamOptions;
181+
streamOptions?: CursorStreamOptions;
173182
[kResumeQueue]: Denque;
183+
[kCursorStream]?: CursorStream;
174184

175185
/** @event */
176186
static readonly CLOSE = 'close' as const;
@@ -239,20 +249,24 @@ export class ChangeStream extends EventEmitter {
239249
this.closed = false;
240250

241251
// Listen for any `change` listeners being added to ChangeStream
242-
this.on('newListener', (eventName: string) => {
252+
this.on('newListener', eventName => {
243253
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
244-
this.cursor.on('data', change => processNewChange(this, change));
254+
streamEvents(this, this.cursor);
245255
}
246256
});
247257

248-
// Listen for all `change` listeners being removed from ChangeStream
249-
this.on('removeListener', (eventName: string) => {
258+
this.on('removeListener', eventName => {
250259
if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
251-
this.cursor.removeAllListeners('data');
260+
this[kCursorStream]?.removeAllListeners(CursorStream.DATA);
252261
}
253262
});
254263
}
255264

265+
/** @internal */
266+
get cursorStream(): CursorStream | undefined {
267+
return this[kCursorStream];
268+
}
269+
256270
/** The cached resume token that is used to resume after the most recently returned change. */
257271
get resumeToken(): ResumeToken {
258272
return this.cursor?.resumeToken;
@@ -305,83 +319,24 @@ export class ChangeStream extends EventEmitter {
305319
const cursor = this.cursor;
306320

307321
return cursor.close(err => {
308-
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
322+
endStream(this);
309323
this.cursor = undefined;
310-
311324
return cb(err);
312325
});
313326
});
314327
}
315328

316-
/**
317-
* This method pulls all the data out of a readable stream, and writes it to the supplied destination,
318-
* automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
319-
*
320-
* @param destination - The destination for writing data
321-
* @param options - {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options| NodeJS Pipe options}
322-
* @throws MongoError if this.cursor is undefined
323-
*/
324-
pipe(destination: Writable, options?: PipeOptions): Writable {
325-
if (!this.pipeDestinations) {
326-
this.pipeDestinations = [];
327-
}
328-
this.pipeDestinations.push(destination);
329-
if (!this.cursor) {
330-
throw new MongoError('ChangeStream has no cursor, unable to pipe');
331-
}
332-
return this.cursor.pipe(destination, options);
333-
}
334-
335-
/**
336-
* This method will remove the hooks set up for a previous pipe() call.
337-
*
338-
* @param destination - The destination for writing data
339-
* @throws MongoError if this.cursor is undefined
340-
*/
341-
unpipe(destination?: Writable): ChangeStreamCursor {
342-
const destinationIndex = destination ? this.pipeDestinations.indexOf(destination) : -1;
343-
if (this.pipeDestinations && destinationIndex > -1) {
344-
this.pipeDestinations.splice(destinationIndex, 1);
345-
}
346-
if (!this.cursor) {
347-
throw new MongoError('ChangeStream has no cursor, unable to unpipe');
348-
}
349-
return this.cursor.unpipe(destination);
350-
}
351-
352329
/**
353330
* Return a modified Readable stream including a possible transform method.
354331
* @throws MongoError if this.cursor is undefined
355332
*/
356-
stream(options?: StreamOptions): ChangeStreamCursor {
333+
stream(options?: CursorStreamOptions): ChangeStreamStream {
357334
this.streamOptions = options;
358335
if (!this.cursor) {
359336
throw new MongoError('ChangeStream has no cursor, unable to stream');
360337
}
361338
return this.cursor.stream(options);
362339
}
363-
364-
/**
365-
* This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
366-
* @throws MongoError if this.cursor is undefined
367-
*/
368-
pause(): ChangeStreamCursor {
369-
if (!this.cursor) {
370-
throw new MongoError('ChangeStream has no cursor, unable to pause');
371-
}
372-
return this.cursor.pause();
373-
}
374-
375-
/**
376-
* This method will cause the readable stream to resume emitting data events.
377-
* @throws MongoError if this.cursor is undefined
378-
*/
379-
resume(): ChangeStreamCursor {
380-
if (!this.cursor) {
381-
throw new MongoError('ChangeStream has no cursor, unable to resume');
382-
}
383-
return this.cursor.resume();
384-
}
385340
}
386341

387342
/** @public */
@@ -524,7 +479,6 @@ function createChangeStreamCursor(
524479

525480
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
526481
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
527-
528482
const changeStreamCursor = new ChangeStreamCursor(
529483
self.topology,
530484
new AggregateOperation(self.parent, pipeline, options),
@@ -533,23 +487,7 @@ function createChangeStreamCursor(
533487

534488
relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
535489

536-
if (self.listenerCount(ChangeStream.CHANGE) > 0) {
537-
changeStreamCursor.on(ChangeStreamCursor.DATA, function (change) {
538-
processNewChange(self, change);
539-
});
540-
}
541-
542-
changeStreamCursor.on(ChangeStream.ERROR, function (error) {
543-
processError(self, error);
544-
});
545-
546-
if (self.pipeDestinations) {
547-
const cursorStream = changeStreamCursor.stream(self.streamOptions);
548-
for (const pipeDestination of self.pipeDestinations) {
549-
cursorStream.pipe(pipeDestination);
550-
}
551-
}
552-
490+
if (self.listenerCount(ChangeStream.CHANGE) > 0) streamEvents(self, changeStreamCursor);
553491
return changeStreamCursor;
554492
}
555493

@@ -595,28 +533,48 @@ function waitForTopologyConnected(
595533
}, 500); // this is an arbitrary wait time to allow SDAM to transition
596534
}
597535

536+
function closeWithError(changeStream: ChangeStream, error: AnyError, callback?: Callback): void {
537+
if (!callback) changeStream.emit(ChangeStream.ERROR, error);
538+
changeStream.close(() => callback && callback(error));
539+
}
540+
541+
function streamEvents(changeStream: ChangeStream, cursor: ChangeStreamCursor): void {
542+
const stream = changeStream[kCursorStream] || cursor.stream();
543+
changeStream[kCursorStream] = stream;
544+
stream.on(CursorStream.DATA, change => processNewChange(changeStream, change));
545+
stream.on(CursorStream.ERROR, error => processError(changeStream, error));
546+
}
547+
548+
function endStream(changeStream: ChangeStream): void {
549+
const cursorStream = changeStream[kCursorStream];
550+
if (cursorStream) {
551+
[CursorStream.DATA, CursorStream.CLOSE, CursorStream.END, CursorStream.ERROR].forEach(event =>
552+
cursorStream.removeAllListeners(event)
553+
);
554+
555+
cursorStream.destroy();
556+
}
557+
558+
changeStream[kCursorStream] = undefined;
559+
}
560+
598561
function processNewChange(
599562
changeStream: ChangeStream,
600563
change: ChangeStreamDocument,
601564
callback?: Callback
602565
) {
603-
// a null change means the cursor has been notified, implicitly closing the change stream
604-
if (change == null) {
605-
changeStream.closed = true;
606-
}
607-
608566
if (changeStream.closed) {
609-
if (callback) callback(new MongoError('ChangeStream is closed'));
567+
if (callback) callback(CHANGESTREAM_CLOSED_ERROR);
610568
return;
611569
}
612570

613-
if (change && !change._id) {
614-
const noResumeTokenError = new Error(
615-
'A change stream document has been received that lacks a resume token (_id).'
616-
);
571+
// a null change means the cursor has been notified, implicitly closing the change stream
572+
if (change == null) {
573+
return closeWithError(changeStream, CHANGESTREAM_CLOSED_ERROR, callback);
574+
}
617575

618-
if (!callback) return changeStream.emit(ChangeStream.ERROR, noResumeTokenError);
619-
return callback(noResumeTokenError);
576+
if (change && !change._id) {
577+
return closeWithError(changeStream, NO_RESUME_TOKEN_ERROR, callback);
620578
}
621579

622580
// cache the resume token
@@ -631,7 +589,7 @@ function processNewChange(
631589
return callback(undefined, change);
632590
}
633591

634-
function processError(changeStream: ChangeStream, error?: AnyError, callback?: Callback) {
592+
function processError(changeStream: ChangeStream, error: AnyError, callback?: Callback) {
635593
const topology = changeStream.topology;
636594
const cursor = changeStream.cursor;
637595

@@ -649,24 +607,15 @@ function processError(changeStream: ChangeStream, error?: AnyError, callback?: C
649607

650608
// otherwise, raise an error and close the change stream
651609
function unresumableError(err: AnyError) {
652-
if (!callback) {
653-
changeStream.emit(ChangeStream.ERROR, err);
654-
changeStream.emit(ChangeStream.CLOSE);
655-
}
656-
processResumeQueue(changeStream, err);
657-
changeStream.closed = true;
610+
if (!callback) changeStream.emit(ChangeStream.ERROR, err);
611+
changeStream.close(() => processResumeQueue(changeStream, err));
658612
}
659613

660614
if (cursor && isResumableError(error as MongoError, maxWireVersion(cursor.server))) {
661615
changeStream.cursor = undefined;
662616

663617
// stop listening to all events from old cursor
664-
[
665-
ChangeStreamCursor.DATA,
666-
ChangeStreamCursor.CLOSE,
667-
ChangeStreamCursor.END,
668-
ChangeStreamCursor.ERROR
669-
].forEach(event => cursor.removeAllListeners(event));
618+
endStream(changeStream);
670619

671620
// close internal cursor, ignore errors
672621
cursor.close();
@@ -691,8 +640,8 @@ function processError(changeStream: ChangeStream, error?: AnyError, callback?: C
691640
return;
692641
}
693642

694-
if (!callback) return changeStream.emit(ChangeStream.ERROR, error);
695-
return callback(error);
643+
// if initial error wasn't resumable, raise an error and close the change stream
644+
return closeWithError(changeStream, error, callback);
696645
}
697646

698647
/**

0 commit comments

Comments
 (0)