Skip to content

Commit 2cb4894

Browse files
authored
feat(changeStream): Adding new 4.0 ChangeStream features
Adds new ChangeStream features for 4.0 as per SPEC-1057 - Db.watch() method - MongoClient.watch() method - startAtClusterTime option Fixes NODE-1483
1 parent 7018c1e commit 2cb4894

13 files changed

+1940
-303
lines changed

lib/change_stream.js

+213-185
Large diffs are not rendered by default.

lib/collection.js

+7-6
Original file line numberDiff line numberDiff line change
@@ -2453,14 +2453,15 @@ Collection.prototype.aggregate = function(pipeline, options, callback) {
24532453
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection.
24542454
* @method
24552455
* @since 3.0.0
2456-
* @param {Array} [pipeline=null] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
2457-
* @param {object} [options=null] Optional settings
2456+
* @param {Array} [pipeline] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
2457+
* @param {object} [options] Optional settings
24582458
* @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
2459-
* @param {object} [options.resumeAfter=null] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
2459+
* @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
24602460
* @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
2461-
* @param {number} [options.batchSize=null] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
2462-
* @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
2463-
* @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
2461+
* @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
2462+
* @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
2463+
* @param {ReadPreference} [options.readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
2464+
* @param {Timestamp} [options.startAtClusterTime] receive change events that occur after the specified timestamp
24642465
* @param {ClientSession} [options.session] optional session to use for this operation
24652466
* @return {ChangeStream} a ChangeStream instance.
24662467
*/

lib/cursor.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -1076,7 +1076,13 @@ Cursor.prototype.close = function(options, callback) {
10761076
};
10771077

10781078
if (this.s.session) {
1079-
return this._endSession(() => completeClose());
1079+
if (typeof callback === 'function') {
1080+
return this._endSession(() => completeClose());
1081+
}
1082+
1083+
return new this.s.promiseLibrary(resolve => {
1084+
this._endSession(() => completeClose().then(resolve));
1085+
});
10801086
}
10811087

10821088
return completeClose();

lib/db.js

+30
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const mergeOptionsAndWriteConcern = require('./utils').mergeOptionsAndWriteConce
1717
const executeOperation = require('./utils').executeOperation;
1818
const applyWriteConcern = require('./utils').applyWriteConcern;
1919
const convertReadPreference = require('./utils').convertReadPreference;
20+
const ChangeStream = require('./change_stream');
2021

2122
// Operations
2223
const addUser = require('./operations/db_ops').addUser;
@@ -876,6 +877,35 @@ Db.prototype.unref = function() {
876877
this.s.topology.unref();
877878
};
878879

880+
/**
881+
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this database. Will ignore all changes to system collections.
882+
* @method
883+
* @since 3.1.0
884+
* @param {Array} [pipeline] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
885+
* @param {object} [options] Optional settings
886+
* @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
887+
* @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
888+
* @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
889+
* @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
890+
* @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
891+
* @param {ReadPreference} [options.readPreference] The read preference. Defaults to the read preference of the database. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
892+
* @param {Timestamp} [options.startAtClusterTime] receive change events that occur after the specified timestamp
893+
* @param {ClientSession} [options.session] optional session to use for this operation
894+
* @return {ChangeStream} a ChangeStream instance.
895+
*/
896+
Db.prototype.watch = function(pipeline, options) {
897+
pipeline = pipeline || [];
898+
options = options || {};
899+
900+
// Allow optionally not specifying a pipeline
901+
if (!Array.isArray(pipeline)) {
902+
options = pipeline;
903+
pipeline = [];
904+
}
905+
906+
return new ChangeStream(this, pipeline, options);
907+
};
908+
879909
/**
880910
* Db close event
881911
*

lib/mongo_client.js

+30
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const shallowClone = require('./utils').shallowClone;
1717
const authenticate = require('./authenticate');
1818
const ServerSessionPool = require('mongodb-core').Sessions.ServerSessionPool;
1919
const executeOperation = require('./utils').executeOperation;
20+
const ChangeStream = require('./change_stream');
2021

2122
const legacyParse = deprecate(
2223
require('./url_parser'),
@@ -596,6 +597,35 @@ MongoClient.prototype.withSession = function(options, operation) {
596597
return cleanupHandler(err, null, { throw: false });
597598
}
598599
};
600+
/**
601+
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this cluster. Will ignore all changes to system collections, as well as the local, admin,
602+
* and config databases.
603+
* @method
604+
* @since 3.1.0
605+
* @param {Array} [pipeline] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
606+
* @param {object} [options] Optional settings
607+
* @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
608+
* @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
609+
* @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
610+
* @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
611+
* @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
612+
* @param {ReadPreference} [options.readPreference] The read preference. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
613+
* @param {Timestamp} [options.startAtClusterTime] receive change events that occur after the specified timestamp
614+
* @param {ClientSession} [options.session] optional session to use for this operation
615+
* @return {ChangeStream} a ChangeStream instance.
616+
*/
617+
MongoClient.prototype.watch = function(pipeline, options) {
618+
pipeline = pipeline || [];
619+
options = options || {};
620+
621+
// Allow optionally not specifying a pipeline
622+
if (!Array.isArray(pipeline)) {
623+
options = pipeline;
624+
pipeline = [];
625+
}
626+
627+
return new ChangeStream(this, pipeline, options);
628+
};
599629

600630
var mergeOptions = function(target, source, flatten) {
601631
for (var name in source) {

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
"eslint-plugin-prettier": "^2.2.0",
2828
"istanbul": "^0.4.5",
2929
"jsdoc": "3.5.5",
30+
"lodash.camelcase": "^4.3.0",
3031
"mongodb-extjson": "^2.1.1",
3132
"mongodb-mock-server": "^1.0.0",
3233
"mongodb-test-runner": "^1.1.18",
+258
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
'use strict';
2+
3+
const EJSON = require('mongodb-extjson');
4+
const chai = require('chai');
5+
const fs = require('fs');
6+
const camelCase = require('lodash.camelcase');
7+
const MongoClient = require('../../lib/mongo_client');
8+
const setupDatabase = require('./shared').setupDatabase;
9+
const delay = require('./shared').delay;
10+
const expect = chai.expect;
11+
12+
describe('Change Stream Spec', function() {
13+
const EJSONToJSON = x => JSON.parse(EJSON.stringify(x));
14+
15+
let globalClient;
16+
let ctx;
17+
let events;
18+
19+
before(function() {
20+
return setupDatabase(this.configuration).then(() => {
21+
globalClient = new MongoClient(this.configuration.url());
22+
return globalClient.connect();
23+
});
24+
});
25+
26+
after(function() {
27+
const gc = globalClient;
28+
globalClient = undefined;
29+
return new Promise(r => gc.close(() => r()));
30+
});
31+
32+
fs
33+
.readdirSync(`${__dirname}/spec/change-stream`)
34+
.filter(filename => filename.match(/\.json$/))
35+
.forEach(filename => {
36+
const specString = fs.readFileSync(`${__dirname}/spec/change-stream/${filename}`, 'utf8');
37+
const specData = JSON.parse(specString);
38+
39+
const ALL_DBS = [specData.database_name, specData.database2_name];
40+
41+
describe(filename, () => {
42+
beforeEach(function() {
43+
const gc = globalClient;
44+
const sDB = specData.database_name;
45+
const sColl = specData.collection_name;
46+
return Promise.all(ALL_DBS.map(db => gc.db(db).dropDatabase()))
47+
.then(() => gc.db(sDB).createCollection(sColl))
48+
.then(() =>
49+
new MongoClient(this.configuration.url(), { monitorCommands: true }).connect()
50+
)
51+
.then(client => {
52+
ctx = { gc, client };
53+
events = [];
54+
const _events = events;
55+
56+
ctx.database = ctx.client.db(sDB);
57+
ctx.collection = ctx.database.collection(sColl);
58+
ctx.client.on('commandStarted', e => _events.push(e));
59+
});
60+
});
61+
62+
afterEach(function() {
63+
const client = ctx.client;
64+
ctx = undefined;
65+
events = undefined;
66+
67+
return client && client.close();
68+
});
69+
70+
specData.tests.forEach(test => {
71+
const itFn = test.skip ? it.skip : test.only ? it.only : it;
72+
const metadata = generateMetadata(test);
73+
const testFn = generateTestFn(test);
74+
75+
itFn(test.description, { metadata, test: testFn });
76+
});
77+
});
78+
});
79+
80+
// Fn Generator methods
81+
82+
function generateMetadata(test) {
83+
const mongodb = test.minServerVersion;
84+
const topology = test.topology;
85+
const requires = {};
86+
if (mongodb) {
87+
requires.mongodb = `>=${mongodb}`;
88+
}
89+
if (topology) {
90+
requires.topology = topology;
91+
}
92+
93+
return { requires };
94+
}
95+
96+
function generateTestFn(test) {
97+
const testFnRunOperations = makeTestFnRunOperations(test);
98+
const testSuccess = makeTestSuccess(test);
99+
const testFailure = makeTestFailure(test);
100+
const testAPM = makeTestAPM(test);
101+
102+
return function testFn() {
103+
return testFnRunOperations(ctx)
104+
.then(testSuccess, testFailure)
105+
.then(() => testAPM(ctx, events));
106+
};
107+
}
108+
109+
function makeTestSuccess(test) {
110+
const result = test.result;
111+
112+
return function testSuccess(value) {
113+
if (result.error) {
114+
throw new Error(`Expected test to return error ${result.error}`);
115+
}
116+
117+
if (result.success) {
118+
value = EJSONToJSON(value);
119+
assertEquality(value, result.success);
120+
}
121+
};
122+
}
123+
124+
function makeTestFailure(test) {
125+
const result = test.result;
126+
127+
return function testFailure(err) {
128+
if (!result.error) {
129+
throw err;
130+
}
131+
132+
assertEquality(err, result.error);
133+
};
134+
}
135+
136+
function makeTestAPM(test) {
137+
const expectedEvents = test.expectations;
138+
139+
return function testAPM(ctx, events) {
140+
expectedEvents
141+
.map(e => e.command_started_event)
142+
.map(normalizeAPMEvent)
143+
.forEach((expected, idx) => {
144+
if (!events[idx]) {
145+
throw new Error(
146+
`Expected there to be an APM event at index ${idx}, but there was none`
147+
);
148+
}
149+
const actual = EJSONToJSON(events[idx]);
150+
assertEquality(actual, expected);
151+
});
152+
};
153+
}
154+
155+
function makeTestFnRunOperations(test) {
156+
const target = test.target;
157+
const operations = test.operations;
158+
const success = test.result.success || [];
159+
160+
return function testFnRunOperations(ctx) {
161+
const changeStreamPipeline = test.changeStreamPipeline;
162+
const changeStreamOptions = test.changeStreamOptions;
163+
ctx.changeStream = ctx[target].watch(changeStreamPipeline, changeStreamOptions);
164+
165+
const changeStreamPromise = readAndCloseChangeStream(ctx.changeStream, success.length);
166+
const operationsPromise = runOperations(ctx.gc, operations);
167+
168+
return Promise.all([changeStreamPromise, operationsPromise]).then(args => args[0]);
169+
};
170+
}
171+
172+
function readAndCloseChangeStream(changeStream, numChanges) {
173+
const close = makeChangeStreamCloseFn(changeStream);
174+
let changeStreamPromise = changeStream.next().then(r => [r]);
175+
176+
for (let i = 1; i < numChanges; i += 1) {
177+
changeStreamPromise = changeStreamPromise.then(results => {
178+
return changeStream.next().then(result => {
179+
results.push(result);
180+
return results;
181+
});
182+
});
183+
}
184+
185+
return changeStreamPromise.then(result => close(null, result), err => close(err));
186+
}
187+
188+
function runOperations(client, operations) {
189+
return operations
190+
.map(op => makeOperation(client, op))
191+
.reduce((p, op) => p.then(op), delay(200));
192+
}
193+
194+
function makeChangeStreamCloseFn(changeStream) {
195+
return function close(error, value) {
196+
return new Promise((resolve, reject) => {
197+
changeStream.close(err => {
198+
if (error || err) {
199+
return reject(error || err);
200+
}
201+
return resolve(value);
202+
});
203+
});
204+
};
205+
}
206+
207+
function normalizeAPMEvent(raw) {
208+
return Object.keys(raw).reduce((agg, key) => {
209+
agg[camelCase(key)] = raw[key];
210+
return agg;
211+
}, {});
212+
}
213+
214+
function makeOperation(client, op) {
215+
const target = client.db(op.database).collection(op.collection);
216+
const command = op.name;
217+
const args = [];
218+
if (op.arguments && op.arguments.document) {
219+
args.push(op.arguments.document);
220+
}
221+
return () => target[command].apply(target, args);
222+
}
223+
224+
function assertEquality(actual, expected) {
225+
try {
226+
_assertEquality(actual, expected);
227+
} catch (e) {
228+
console.dir(actual, { depth: 999 });
229+
console.dir(expected, { depth: 999 });
230+
throw e;
231+
}
232+
}
233+
234+
function _assertEquality(actual, expected) {
235+
try {
236+
if (expected === '42' || expected === 42) {
237+
expect(actual).to.exist;
238+
return;
239+
}
240+
241+
expect(actual).to.be.a(Array.isArray(expected) ? 'array' : typeof expected);
242+
243+
if (expected == null) {
244+
expect(actual).to.not.exist;
245+
} else if (Array.isArray(expected)) {
246+
expected.forEach((ex, idx) => _assertEquality(actual[idx], ex));
247+
} else if (typeof expected === 'object') {
248+
for (let i in expected) {
249+
_assertEquality(actual[i], expected[i]);
250+
}
251+
} else {
252+
expect(actual).to.equal(expected);
253+
}
254+
} catch (e) {
255+
throw e;
256+
}
257+
}
258+
});

0 commit comments

Comments
 (0)