Skip to content

Clean snapshots automatically whenever activated #101

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 8 commits into from
Jan 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ create a snapshot point

});

You can automatically clean older snapshots by configuring the number of snapshots to keep with `maxSnapshotsCount` in `eventstore` options.

## own event dispatching (no event publisher function defined)

Expand Down
14 changes: 14 additions & 0 deletions lib/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ function implementError (callback) {
throw err;
}

function silentWarning(callback) {
console.warn('Snapshot cleaning is not implemented for this kind of store');
callback();
}

_.extend(Store.prototype, {

/**
Expand Down Expand Up @@ -106,6 +111,15 @@ _.extend(Store.prototype, {
implementError(callback);
},

/**
* stores a new snapshot
* @param {Object} query the query object
* @param {Function} callback the function that will be called when this action has finished [optional]
*/
cleanSnapshots: function(query, callback) {
silentWarning(callback);
},

/**
* stores the passed events
* @param {Array} evts the events
Expand Down
87 changes: 70 additions & 17 deletions lib/databases/elasticsearch.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ _.extend(Elastic.prototype, {
return;
}

var noAggId = false
var noAggId = false;
var bulkMap = [];

_.forEach(events, function (evt) {
Expand All @@ -106,32 +106,45 @@ _.extend(Elastic.prototype, {
});
},

_search: function (type, find, sort, skip, limit, callback) {
var options = this.options;
var searchOptions = {
_buildQuery: function(type, find, sort, skip, limit) {
var query = {
index: this.options.indexName,
type: type,
defaultOperator: 'AND',
from: (!skip ? 0 : skip),
size: (!limit || limit === -1 ? options.maxSearchResults : limit)
size: (!limit || limit === -1 ? this.options.maxSearchResults - (skip || 0) : limit)
};
if (find && find.length) searchOptions.q = find.join(' AND ');
if (sort && sort.length) searchOptions.sort = sort;
if (find && find.length) query.q = find.join(' AND ');
if (sort && sort.length) query.sort = sort;
return query;
},

_search: function (type, find, sort, skip, limit, callback) {
var options = this.options;
var searchOptions = this._buildQuery(type, find, sort, skip, limit);

this.client.search(searchOptions, function (error, response) {
var dataList = [];
if (response && response.hits && response.hits.hits && response.hits.hits.length) {
if (response.hits.hits.length >= options.maxSearchResults){
var errMsg = 'reached maximum of ' + options.maxSearchResults + ' search results!';
debug(errMsg);
if (callback) callback(new Error(errMsg));
return;
if (response) {
if (response.error) {
var error = new Error(response.error.root_cause[0].reason);
debug(error.message);
return callback(error);
}
if (response.hits && response.hits.hits && response.hits.hits.length) {
if (response.hits.hits.length >= options.maxSearchResults){
var errMsg = 'reached maximum of ' + options.maxSearchResults + ' search results!';
debug(errMsg);
if (callback) callback(new Error(errMsg));
return;
}
dataList = response.hits.hits.map(function (data) {
data._source.commitStamp = new Date(data._source.commitStamp);
return data._source;
});
}
dataList = response.hits.hits.map(function (data) {
data._source.commitStamp = new Date(data._source.commitStamp);
return data._source;
});
}

callback(null, dataList);
});
},
Expand Down Expand Up @@ -242,6 +255,46 @@ _.extend(Elastic.prototype, {
});
},

cleanSnapshots: function (query, callback) {
if (!query.aggregateId) {
var errMsg = 'aggregateId not defined!';
debug(errMsg);
if (callback) callback(new Error(errMsg));
return;
}

var findStatement = ['aggregateId:' + query.aggregateId];

if (query.aggregate) findStatement.push('aggregate:' + query.aggregate);
if (query.context) findStatement.push('context:' + query.context);

var self = this;
this._searchSnapshots(findStatement, this.options.maxSnapshotsCount, -1, function (error, response) {
if (error) {
return callback(error);
}
self._bulkDelete(self.options.snapshotsTypeName, response, callback);
});
},

_bulkDelete: function (type, items, callback) {
var index = this.options.indexName;
var deleteStatements = items.map(function(item) {
return {
delete: {
_index: index,
_type: type,
_id: item.id
}
};
});
this.client.bulk({
body: deleteStatements
}, function(error, response) {
callback(error, response ? response.items.length : 0);
});
},

getSnapshot: function (query, revMax, callback) {
if (!query.aggregateId) {
var errMsg = 'aggregateId not defined!';
Expand Down
21 changes: 21 additions & 0 deletions lib/databases/inmemory.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ function InMemory(options) {
this.store = {};
this.snapshots = {};
this.undispatchedEvents = { _direct: {} };
this.options = options;
}

util.inherits(InMemory, Store);
Expand Down Expand Up @@ -426,6 +427,26 @@ _.extend(InMemory.prototype, {
}
}
callback(null, null);
},

cleanSnapshots: function(query, callback) {
var aggregateId = query.aggregateId;
var aggregate = query.aggregate || '_general';
var context = query.context || '_general';

if (!aggregateId) {
var errMsg = 'aggregateId not defined!';
debug(errMsg);
if (callback) callback(new Error(errMsg));
return;
}

var snapshots = this.snapshots[context][aggregate][aggregateId] || [];
var length = snapshots.length;
snapshots = snapshots.slice(-1 * this.options.maxSnapshotsCount);
this.snapshots[context][aggregate][aggregateId] = snapshots;

callback(null, length - snapshots.length);
}

});
Expand Down
46 changes: 46 additions & 0 deletions lib/databases/mongodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,33 @@ _.extend(Mongo.prototype, {
this.snapshots.insert(snap, callback);
},

cleanSnapshots: function (query, callback) {
if (!query.aggregateId) {
var errMsg = 'aggregateId not defined!';
debug(errMsg);
if (callback) callback(new Error(errMsg));
return;
}

var findStatement = {
aggregateId: query.aggregateId
};

if (query.aggregate) {
findStatement.aggregate = query.aggregate;
}

if (query.context) {
findStatement.context = query.context;
}

this.snapshots.find(findStatement, {
sort: [['revision', 'desc'], ['version', 'desc'], ['commitStamp', 'desc']]
})
.skip(this.options.maxSnapshotsCount)
.toArray(removeElements(this.snapshots, callback));
},

getSnapshot: function (query, revMax, callback) {
if (!query.aggregateId) {
var errMsg = 'aggregateId not defined!';
Expand Down Expand Up @@ -570,4 +597,23 @@ _.extend(Mongo.prototype, {

});

function removeElements(collection, callback) {
return function (error, elements) {
if (error) {
debug(error);
return callback(error);
}
async.each(elements, function (element, callback) {
try {
collection.deleteOne({_id: element._id});
callback();
} catch (error) {
callback(error);
}
}, function(error) {
callback(error, elements.length);
});
}
}

module.exports = Mongo;
99 changes: 62 additions & 37 deletions lib/databases/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -579,16 +579,33 @@ _.extend(Redis.prototype, {
});
},

getSnapshot: function (query, revMax, callback) {
cleanSnapshots: function (query, callback) {
var self = this;

this.scanSnapshots(query, function(error, keys) {
if (error) {
debug(error);
if (callback) callback(error);
return;
}

var keysToDelete = keys
.sort()
.slice(0, -1 * self.options.maxSnapshotsCount)
.concat(callback);

self.client.del.apply(self.client, keysToDelete);
});
},

scanSnapshots: function (query, callback) {
if (!query.aggregateId) {
var errMsg = 'aggregateId not defined!';
debug(errMsg);
if (callback) callback(new Error(errMsg));
return;
}

var self = this;

var aggregateId = query.aggregateId;
var aggregate = query.aggregate || '*';
var context = query.context || '*';
Expand All @@ -599,47 +616,55 @@ _.extend(Redis.prototype, {
function (keys, fn) {
allKeys = allKeys.concat(keys);
fn();
}, function (err) {
if (err) {
debug(err);
if (callback) callback(err);
return;
}
}, function (error) {
callback(error, allKeys);
}
);
},

allKeys = _.sortBy(allKeys, function (s) {
return s;
}).reverse();
getSnapshot: function (query, revMax, callback) {
var self = this;

if (revMax === -1) { // by default the last snapshot is kept
allKeys = allKeys.slice(0, 1);
}
this.scanSnapshots(query, function (err, allKeys) {
if (err) {
debug(err);
if (callback) callback(err);
return;
}

if (allKeys.length === 0) {
return callback(null, null);
}
allKeys = _.sortBy(allKeys, function (s) {
return s;
}).reverse();

// iterating recursively over snapshots, from latest to oldest
(function nextSnapshot(key) {
self.client.get(key, function (err, res) {
if (err) {
debug(err);
return callback(err);
}
if (revMax === -1) { // by default the last snapshot is kept
allKeys = allKeys.slice(0, 1);
}

var snapshot = jsondate.parse(res);
if (revMax > -1 && snapshot.revision > revMax) {
if (allKeys.length > 0) {
nextSnapshot(allKeys.shift());
} else {
callback(null, null);
}
if (allKeys.length === 0) {
return callback(null, null);
}

// iterating recursively over snapshots, from latest to oldest
(function nextSnapshot(key) {
self.client.get(key, function (err, res) {
if (err) {
debug(err);
return callback(err);
}

var snapshot = jsondate.parse(res);
if (revMax > -1 && snapshot.revision > revMax) {
if (allKeys.length > 0) {
nextSnapshot(allKeys.shift());
} else {
callback(null, snapshot);
callback(null, null);
}
});
})(allKeys.shift());
}
);
} else {
callback(null, snapshot);
}
});
})(allKeys.shift());
})
}

});
Expand Down
Loading