diff --git a/README.md b/README.md index 7704dbe1..a24b65d9 100644 --- a/README.md +++ b/README.md @@ -345,6 +345,7 @@ create a snapshot point }); +You can automatically clean older snapshots by configuring the number of snapshots to keep with `maxSnapshotsCount` in `eventstore` options. ## own event dispatching (no event publisher function defined) diff --git a/lib/base.js b/lib/base.js index 4429b055..205f0d4a 100644 --- a/lib/base.js +++ b/lib/base.js @@ -24,6 +24,11 @@ function implementError (callback) { throw err; } +function silentWarning(callback) { + console.warn('Snapshot cleaning is not implemented for this kind of store'); + callback(); +} + _.extend(Store.prototype, { /** @@ -106,6 +111,15 @@ _.extend(Store.prototype, { implementError(callback); }, + /** + * stores a new snapshot + * @param {Object} query the query object + * @param {Function} callback the function that will be called when this action has finished [optional] + */ + cleanSnapshots: function(query, callback) { + silentWarning(callback); + }, + /** * stores the passed events * @param {Array} evts the events diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index ee925ce2..9f5dc9a5 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -83,7 +83,7 @@ _.extend(Elastic.prototype, { return; } - var noAggId = false + var noAggId = false; var bulkMap = []; _.forEach(events, function (evt) { @@ -106,32 +106,45 @@ _.extend(Elastic.prototype, { }); }, - _search: function (type, find, sort, skip, limit, callback) { - var options = this.options; - var searchOptions = { + _buildQuery: function(type, find, sort, skip, limit) { + var query = { index: this.options.indexName, type: type, defaultOperator: 'AND', from: (!skip ? 0 : skip), - size: (!limit || limit === -1 ? options.maxSearchResults : limit) + size: (!limit || limit === -1 ? this.options.maxSearchResults - (skip || 0) : limit) }; - if (find && find.length) searchOptions.q = find.join(' AND '); - if (sort && sort.length) searchOptions.sort = sort; + if (find && find.length) query.q = find.join(' AND '); + if (sort && sort.length) query.sort = sort; + return query; + }, + + _search: function (type, find, sort, skip, limit, callback) { + var options = this.options; + var searchOptions = this._buildQuery(type, find, sort, skip, limit); this.client.search(searchOptions, function (error, response) { var dataList = []; - if (response && response.hits && response.hits.hits && response.hits.hits.length) { - if (response.hits.hits.length >= options.maxSearchResults){ - var errMsg = 'reached maximum of ' + options.maxSearchResults + ' search results!'; - debug(errMsg); - if (callback) callback(new Error(errMsg)); - return; + if (response) { + if (response.error) { + var error = new Error(response.error.root_cause[0].reason); + debug(error.message); + return callback(error); + } + if (response.hits && response.hits.hits && response.hits.hits.length) { + if (response.hits.hits.length >= options.maxSearchResults){ + var errMsg = 'reached maximum of ' + options.maxSearchResults + ' search results!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + dataList = response.hits.hits.map(function (data) { + data._source.commitStamp = new Date(data._source.commitStamp); + return data._source; + }); } - dataList = response.hits.hits.map(function (data) { - data._source.commitStamp = new Date(data._source.commitStamp); - return data._source; - }); } + callback(null, dataList); }); }, @@ -242,6 +255,46 @@ _.extend(Elastic.prototype, { }); }, + cleanSnapshots: function (query, callback) { + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var findStatement = ['aggregateId:' + query.aggregateId]; + + if (query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query.context) findStatement.push('context:' + query.context); + + var self = this; + this._searchSnapshots(findStatement, this.options.maxSnapshotsCount, -1, function (error, response) { + if (error) { + return callback(error); + } + self._bulkDelete(self.options.snapshotsTypeName, response, callback); + }); + }, + + _bulkDelete: function (type, items, callback) { + var index = this.options.indexName; + var deleteStatements = items.map(function(item) { + return { + delete: { + _index: index, + _type: type, + _id: item.id + } + }; + }); + this.client.bulk({ + body: deleteStatements + }, function(error, response) { + callback(error, response ? response.items.length : 0); + }); + }, + getSnapshot: function (query, revMax, callback) { if (!query.aggregateId) { var errMsg = 'aggregateId not defined!'; diff --git a/lib/databases/inmemory.js b/lib/databases/inmemory.js index 82fa3e68..02634b98 100644 --- a/lib/databases/inmemory.js +++ b/lib/databases/inmemory.js @@ -11,6 +11,7 @@ function InMemory(options) { this.store = {}; this.snapshots = {}; this.undispatchedEvents = { _direct: {} }; + this.options = options; } util.inherits(InMemory, Store); @@ -426,6 +427,26 @@ _.extend(InMemory.prototype, { } } callback(null, null); + }, + + cleanSnapshots: function(query, callback) { + var aggregateId = query.aggregateId; + var aggregate = query.aggregate || '_general'; + var context = query.context || '_general'; + + if (!aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var snapshots = this.snapshots[context][aggregate][aggregateId] || []; + var length = snapshots.length; + snapshots = snapshots.slice(-1 * this.options.maxSnapshotsCount); + this.snapshots[context][aggregate][aggregateId] = snapshots; + + callback(null, length - snapshots.length); } }); diff --git a/lib/databases/mongodb.js b/lib/databases/mongodb.js index 5708cdb9..5bb267ec 100644 --- a/lib/databases/mongodb.js +++ b/lib/databases/mongodb.js @@ -396,6 +396,33 @@ _.extend(Mongo.prototype, { this.snapshots.insert(snap, callback); }, + cleanSnapshots: function (query, callback) { + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var findStatement = { + aggregateId: query.aggregateId + }; + + if (query.aggregate) { + findStatement.aggregate = query.aggregate; + } + + if (query.context) { + findStatement.context = query.context; + } + + this.snapshots.find(findStatement, { + sort: [['revision', 'desc'], ['version', 'desc'], ['commitStamp', 'desc']] + }) + .skip(this.options.maxSnapshotsCount) + .toArray(removeElements(this.snapshots, callback)); + }, + getSnapshot: function (query, revMax, callback) { if (!query.aggregateId) { var errMsg = 'aggregateId not defined!'; @@ -570,4 +597,23 @@ _.extend(Mongo.prototype, { }); +function removeElements(collection, callback) { + return function (error, elements) { + if (error) { + debug(error); + return callback(error); + } + async.each(elements, function (element, callback) { + try { + collection.deleteOne({_id: element._id}); + callback(); + } catch (error) { + callback(error); + } + }, function(error) { + callback(error, elements.length); + }); + } +} + module.exports = Mongo; diff --git a/lib/databases/redis.js b/lib/databases/redis.js index 17e7c070..b68ce788 100644 --- a/lib/databases/redis.js +++ b/lib/databases/redis.js @@ -579,7 +579,26 @@ _.extend(Redis.prototype, { }); }, - getSnapshot: function (query, revMax, callback) { + cleanSnapshots: function (query, callback) { + var self = this; + + this.scanSnapshots(query, function(error, keys) { + if (error) { + debug(error); + if (callback) callback(error); + return; + } + + var keysToDelete = keys + .sort() + .slice(0, -1 * self.options.maxSnapshotsCount) + .concat(callback); + + self.client.del.apply(self.client, keysToDelete); + }); + }, + + scanSnapshots: function (query, callback) { if (!query.aggregateId) { var errMsg = 'aggregateId not defined!'; debug(errMsg); @@ -587,8 +606,6 @@ _.extend(Redis.prototype, { return; } - var self = this; - var aggregateId = query.aggregateId; var aggregate = query.aggregate || '*'; var context = query.context || '*'; @@ -599,47 +616,55 @@ _.extend(Redis.prototype, { function (keys, fn) { allKeys = allKeys.concat(keys); fn(); - }, function (err) { - if (err) { - debug(err); - if (callback) callback(err); - return; - } + }, function (error) { + callback(error, allKeys); + } + ); + }, - allKeys = _.sortBy(allKeys, function (s) { - return s; - }).reverse(); + getSnapshot: function (query, revMax, callback) { + var self = this; - if (revMax === -1) { // by default the last snapshot is kept - allKeys = allKeys.slice(0, 1); - } + this.scanSnapshots(query, function (err, allKeys) { + if (err) { + debug(err); + if (callback) callback(err); + return; + } - if (allKeys.length === 0) { - return callback(null, null); - } + allKeys = _.sortBy(allKeys, function (s) { + return s; + }).reverse(); - // iterating recursively over snapshots, from latest to oldest - (function nextSnapshot(key) { - self.client.get(key, function (err, res) { - if (err) { - debug(err); - return callback(err); - } + if (revMax === -1) { // by default the last snapshot is kept + allKeys = allKeys.slice(0, 1); + } - var snapshot = jsondate.parse(res); - if (revMax > -1 && snapshot.revision > revMax) { - if (allKeys.length > 0) { - nextSnapshot(allKeys.shift()); - } else { - callback(null, null); - } + if (allKeys.length === 0) { + return callback(null, null); + } + + // iterating recursively over snapshots, from latest to oldest + (function nextSnapshot(key) { + self.client.get(key, function (err, res) { + if (err) { + debug(err); + return callback(err); + } + + var snapshot = jsondate.parse(res); + if (revMax > -1 && snapshot.revision > revMax) { + if (allKeys.length > 0) { + nextSnapshot(allKeys.shift()); } else { - callback(null, snapshot); + callback(null, null); } - }); - })(allKeys.shift()); - } - ); + } else { + callback(null, snapshot); + } + }); + })(allKeys.shift()); + }) } }); diff --git a/lib/databases/tingodb.js b/lib/databases/tingodb.js index 8f4e7f9d..a13cac8c 100644 --- a/lib/databases/tingodb.js +++ b/lib/databases/tingodb.js @@ -283,6 +283,36 @@ _.extend(Tingo.prototype, { this.snapshots.insert(snap, callback); }, + cleanSnapshots: function (query, callback) { + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var findStatement = { + '$or': [ + { aggregateId: query.aggregateId }, + { streamId: query.aggregateId } // just for compatability of < 1.0.0 + ] + }; + + if (query.aggregate) { + findStatement.aggregate = query.aggregate; + } + + if (query.context) { + findStatement.context = query.context; + } + + this.snapshots.find(findStatement, { + sort: [['revision', 'desc'], ['version', 'desc'], ['commitStamp', 'desc']] + }) + .skip(this.options.maxSnapshotsCount) + .toArray(removeElements(this.snapshots, callback)); + }, + getSnapshot: function (query, revMax, callback) { if (!query.aggregateId) { var errMsg = 'aggregateId not defined!'; @@ -460,4 +490,23 @@ _.extend(Tingo.prototype, { }); +function removeElements(collection, callback) { + return function (error, elements) { + if (error) { + debug(error); + return callback(error); + } + async.each(elements, function (element, callback) { + try { + collection.remove({_id: element._id}); + callback(); + } catch (error) { + callback(error); + } + }, function(error) { + callback(error, elements.length); + }); + } +} + module.exports = Tingo; diff --git a/lib/eventstore.js b/lib/eventstore.js index 17bc774a..7e3f0a8a 100644 --- a/lib/eventstore.js +++ b/lib/eventstore.js @@ -363,18 +363,25 @@ _.extend(Eventstore.prototype, { var self = this; async.waterfall([ - function getNewIdFromStorage(callback) { - self.getNewId(callback); - }, - function commit(id, callback) { - try { - var snap = new Snapshot(id, obj); - snap.commitStamp = new Date(); - self.store.addSnapshot(snap, callback); - } catch (err) { - callback(err); - } - }], + function getNewIdFromStorage(callback) { + self.getNewId(callback); + }, + function commit(id, callback) { + try { + var snap = new Snapshot(id, obj); + snap.commitStamp = new Date(); + } catch (err) { + return callback(err); + } + + self.store.addSnapshot(snap, function(error) { + if (self.options.maxSnapshotsCount) { + self.store.cleanSnapshots(_.pick(obj, 'aggregateId', 'aggregate', 'context'), callback); + } else { + callback(error); + } + }); + }], callback ); }, diff --git a/test/eventstoreTest.js b/test/eventstoreTest.js index abab0a0f..671299a5 100644 --- a/test/eventstoreTest.js +++ b/test/eventstoreTest.js @@ -739,6 +739,75 @@ describe('eventstore', function () { }); + describe('cleanSnapshots', function () { + + var es = eventstore({ + maxSnapshotsCount: 5 + }), + orgFunc = es.store.cleanSnapshots, + addSnapshot = es.store.addSnapshot; + + before(function (done) { + es.store.addSnapshot = function (snap, callback) { + callback(); + }; + es.init(done); + }); + + after(function () { + es.store.cleanSnapshots = orgFunc; + es.store.addSnapshot = addSnapshot; + }); + + describe('with streamId', function () { + + it('it should pass them correctly', function (done) { + + var obj = { + streamId: 'myAggId', + aggregate: 'myAgg', + context: 'myCont', + data: { snap: 'data' } + }; + + es.store.cleanSnapshots = function (query, callback) { + expect(query.aggregateId).to.eql(obj.streamId); + expect(query.aggregate).to.eql(obj.aggregate); + expect(query.context).to.eql(obj.context); + expect(callback).to.be.a('function'); + callback(); + }; + + es.createSnapshot(obj, done); + }); + + }); + + describe('with options not activated', function () { + + before(function () { + es.options.maxSnapshotsCount = 0; + }); + + it('it should not clean snapshots', function (done) { + + var obj = { + streamId: 'myAggId', + aggregate: 'myAgg', + context: 'myCont', + data: { snap: 'data' } + }; + + es.store.cleanSnapshots = function (query, callback) { + callback(new Error('clean snapshots should not have been called')); + }; + + es.createSnapshot(obj, done); + }); + + }); + }); + describe('setEventToDispatched', function () { var es = eventstore(), diff --git a/test/storeTest.js b/test/storeTest.js index ebaba154..a7370418 100644 --- a/test/storeTest.js +++ b/test/storeTest.js @@ -4,7 +4,7 @@ var expect = require('expect.js'), _ = require('lodash'), crypto = require('crypto'); -var types = ['inmemory', 'tingodb', 'mongodb', 'redis'/*, 'azuretable', 'elasticsearch', 'dynamodb'*/]; +var types = ['inmemory', 'tingodb', 'mongodb', 'redis'/*, 'elasticsearch', 'azuretable', 'dynamodb'*/]; var token = crypto.randomBytes(16).toString('hex'); @@ -28,6 +28,7 @@ types.forEach(function (type) { describe('creating an instance', function () { before(function () { + options = {}; if (type === "azuretable" || type === "dynamodb") { options = { eventsTableName: 'events' + token, @@ -40,6 +41,7 @@ types.forEach(function (type) { db: 3 }; } + options.maxSnapshotsCount = 5; store = new Store(options); }); @@ -2928,6 +2930,164 @@ types.forEach(function (type) { }); + describe('cleaning snapshots', function () { + + describe('having some snapshots in the eventstore calling cleanSnapshot', function () { + + var snap1 = { + id: 'rev3', + aggregateId: '920193847', + aggregate: 'myCoolAggregate', + context: 'myCoolContext', + commitStamp: new Date(Date.now() + 405), + revision: 3, + version: 1, + data: { + mySnappi: 'data' + } + }; + + var snap2 = { + id: 'rev4', + aggregateId: '920193847', + aggregate: 'myCoolAggregate', + context: 'myCoolContext', + commitStamp: new Date(Date.now() + 410), + revision: 4, + version: 1, + data: { + mySnappi: 'data2' + } + }; + + var snap3 = { + id: 'rev5', + aggregateId: '920193847', + aggregate: 'myCoolAggregate', + context: 'myCoolContext', + commitStamp: new Date(Date.now() + 420), + revision: 5, + version: 1, + data: { + mySnappi: 'data3' + } + }; + + var snap4 = { + id: 'rev9', + aggregateId: '920193847', + aggregate: 'myCoolAggregate', + context: 'myCoolContext', + commitStamp: new Date(Date.now() + 430), + revision: 9, + version: 1, + data: { + mySnappi: 'data4' + } + }; + + var snap5 = { + id: 'rev10', + aggregateId: '920193847', + aggregate: 'myCoolAggregate', + context: 'myCoolContext', + commitStamp: new Date(Date.now() + 440), + revision: 10, + version: 1, + data: { + mySnappi: 'dataXY' + } + }; + + var snap6 = { + id: 'rev12', + aggregateId: '920193847', + aggregate: 'myCoolAggregate', + context: 'myCoolContext', + commitStamp: new Date(Date.now() + 450), + revision: 12, + version: 1, + data: { + mySnappi: 'dataaaaa' + } + }; + + var snap7 = { + id: 'rev16', + aggregateId: '920193847', + aggregate: 'myCoolAggregate', + context: 'myCoolContext', + commitStamp: new Date(Date.now() + 555), + revision: 16, + version: 1, + data: { + mySnappi: 'dataaaaa2' + } + }; + + var snap8 = { + id: 'rev17', + aggregateId: '920193847', + aggregate: 'myCoolAggregate', + context: 'myCoolContext', + commitStamp: new Date(Date.now() + 575), + revision: 17, + version: 1, + data: { + mySnappi: 'dataaaaa3' + } + }; + + beforeEach(function (done) { + async.series([ + function (callback) { + store.addSnapshot(snap1, callback); + }, + function (callback) { + store.addSnapshot(snap2, callback); + }, + function (callback) { + store.addSnapshot(snap3, callback); + }, + function (callback) { + store.addSnapshot(snap4, callback); + }, + function (callback) { + store.addSnapshot(snap5, callback); + }, + function (callback) { + store.addSnapshot(snap6, callback); + }, + function (callback) { + store.addSnapshot(snap7, callback); + }, + function (callback) { + store.addSnapshot(snap8, callback); + } + ], done); + }); + + describe('with an aggregateId being used only in one context and aggregate', function () { + + it('it should clean oldest snapshots', function (done) { + + store.cleanSnapshots({ + aggregateId: '920193847', + aggregate: 'myCoolAggregate', + context: 'myCoolContext' + }, function (err, cleanedCount) { + expect(err).not.to.be.ok(); + expect(cleanedCount).to.equal(3); + done(); + }); + + }); + }); + + }); + + }); + }); });