From 3c9715c8b0e674cc2935ca5c7779daa5d5988b9b Mon Sep 17 00:00:00 2001 From: Scott Saltsgaver Date: Fri, 12 Feb 2021 02:30:08 -0500 Subject: [PATCH 1/3] Adding support for promises --- test/clean-async.js | 130 ++++++++++++++++++++++++++++ test/dead-queue-async.js | 175 +++++++++++++++++++++++++++++++++++++ test/default-async.js | 103 ++++++++++++++++++++++ test/delay-async.js | 103 ++++++++++++++++++++++ test/indexes-async.js | 27 ++++++ test/many-async.js | 74 ++++++++++++++++ test/multi-async.js | 68 +++++++++++++++ test/ping-async.js | 182 +++++++++++++++++++++++++++++++++++++++ test/stats-async.js | 165 +++++++++++++++++++++++++++++++++++ test/visibility-async.js | 169 ++++++++++++++++++++++++++++++++++++ 10 files changed, 1196 insertions(+) create mode 100644 test/clean-async.js create mode 100644 test/dead-queue-async.js create mode 100644 test/default-async.js create mode 100644 test/delay-async.js create mode 100644 test/indexes-async.js create mode 100644 test/many-async.js create mode 100644 test/multi-async.js create mode 100644 test/ping-async.js create mode 100644 test/stats-async.js create mode 100644 test/visibility-async.js diff --git a/test/clean-async.js b/test/clean-async.js new file mode 100644 index 0000000..1ce027c --- /dev/null +++ b/test/clean-async.js @@ -0,0 +1,130 @@ +var async = require('async') +var test = require('tape') + +var setup = require('./setup.js') +var mongoDbQueue = require('../mongodb-queue') + +setup(function(client, db) { + + test('clean: check deleted messages are deleted', function(t) { + var queue = mongoDbQueue(db, 'clean', { visibility : 3 }) + var msg + + async.series( + [ + async function() { + var size = await queue.size(); + t.pass('There is no error.') + t.equal(size, 0, 'There is currently nothing on the queue') + }, + async function() { + var size = await queue.total(); + t.pass('There is no error.') + t.equal(size, 0, 'There is currently nothing in the queue at all') + }, + async function() { + await queue.clean(); + t.pass('There is no error.') + }, + async function() { + var size = await queue.size(); + t.pass('There is no error.') + t.equal(size, 0, 'There is currently nothing on the queue') + }, + async function() { + var size = await queue.total(); + t.pass('There is no error.') + t.equal(size, 0, 'There is currently nothing in the queue at all') + }, + async function() { + await queue.add('Hello, World!'); + t.pass('There is no error when adding a message.') + }, + async function() { + await queue.clean(); + t.pass('There is no error.') + }, + async function() { + var size = await queue.size(); + t.pass('There is no error.') + t.equal(size, 1, 'Queue size is correct') + }, + async function() { + var size = await queue.total(); + t.pass('There is no error.') + t.equal(size, 1, 'Queue total is correct') + }, + async function() { + var newMsg = await queue.get(); + msg = newMsg + t.ok(msg.id, 'Got a msg.id (sanity check)') + }, + async function() { + var size = await queue.size(); + t.pass('There is no error.') + t.equal(size, 0, 'Queue size is correct') + }, + async function() { + var size = await queue.total(); + t.pass('There is no error.') + t.equal(size, 1, 'Queue total is correct') + }, + async function() { + await queue.clean(); + t.pass('There is no error.') + }, + async function() { + var size = await queue.size(); + t.pass('There is no error.') + t.equal(size, 0, 'Queue size is correct') + }, + async function() { + var size = await queue.total(); + t.pass('There is no error.') + t.equal(size, 1, 'Queue total is correct') + }, + async function() { + var id = await queue.ack(msg.ack); + t.pass('No error when acking the message') + t.ok(id, 'Received an id when acking this message') + }, + async function() { + var size = await queue.size(); + t.pass('There is no error.') + t.equal(size, 0, 'Queue size is correct') + }, + async function() { + var size = await queue.total(); + t.pass('There is no error.') + t.equal(size, 1, 'Queue total is correct') + }, + async function() { + await queue.clean(); + t.pass('There is no error.') + }, + async function() { + var size = await queue.size(); + t.pass('There is no error.') + t.equal(size, 0, 'Queue size is correct') + }, + async function() { + var size = await queue.total(); + t.pass('There is no error.') + t.equal(size, 0, 'Queue total is correct') + }, + ], + function(err) { + if (err) t.fail(err) + t.pass('Finished test ok') + t.end() + } + ) + }) + + test('client.close()', function(t) { + t.pass('client.close()') + client.close() + t.end() + }) + +}) diff --git a/test/dead-queue-async.js b/test/dead-queue-async.js new file mode 100644 index 0000000..6d01d23 --- /dev/null +++ b/test/dead-queue-async.js @@ -0,0 +1,175 @@ +var async = require('async') +var test = require('tape') + +var setup = require('./setup.js') +var mongoDbQueue = require('../') + +setup(function(client, db) { + + test('first test', function(t) { + var queue = mongoDbQueue(db, 'queue', { visibility : 3, deadQueue : 'dead-queue' }) + t.ok(queue, 'Queue created ok') + t.end() + }); + + test('single message going over 5 tries, should appear on dead-queue', function(t) { + var deadQueue = mongoDbQueue(db, 'dead-queue') + var queue = mongoDbQueue(db, 'queue', { visibility : 1, deadQueue : deadQueue }) + var msg + var origId + + async.series( + [ + async function() { + var id = await queue.add('Hello, World!'); + t.pass('There is no error when adding a message.') + t.ok(id, 'Received an id for this message') + origId = id + }, + async function() { + var thisMsg = await queue.get(); + await new Promise(function(resolve) { + setTimeout(function() { + t.pass('First expiration') + resolve() + }, 2 * 1000); + }); + }, + async function() { + var thisMsg = await queue.get(); + await new Promise(function(resolve) { + setTimeout(function() { + t.pass('Second expiration') + resolve() + }, 2 * 1000); + }); + }, + async function() { + var thisMsg = await queue.get(); + await new Promise(function(resolve) { + setTimeout(function() { + t.pass('Third expiration') + resolve() + }, 2 * 1000); + }); + }, + async function() { + var thisMsg = await queue.get(); + await new Promise(function(resolve) { + setTimeout(function() { + t.pass('Fourth expiration') + resolve() + }, 2 * 1000); + }); + }, + async function() { + var thisMsg = await queue.get(); + await new Promise(function(resolve) { + setTimeout(function() { + t.pass('Fifth expiration') + resolve() + }, 2 * 1000); + }); + }, + async function() { + var id = await queue.get(); + t.pass('No error when getting no messages') + t.ok(!id, 'No msg received') + }, + async function() { + var msg = await deadQueue.get(); + t.pass('No error when getting from the deadQueue') + t.ok(msg.id, 'Got a message id from the deadQueue') + t.equal(msg.payload.id, origId, 'Got the same message id as the original message') + t.equal(msg.payload.payload, 'Hello, World!', 'Got the same as the original message') + t.equal(msg.payload.tries, 6, 'Got the tries as 6') + }, + ], + function(err) { + t.ok(!err, 'No error during single round-trip test') + t.end() + } + ) + }) + + test('two messages, with first going over 3 tries', function(t) { + var deadQueue = mongoDbQueue(db, 'dead-queue-2') + var queue = mongoDbQueue(db, 'queue-2', { visibility : 1, deadQueue : deadQueue, maxRetries : 3 }) + var msg + var origId, origId2 + + async.series( + [ + async function() { + var id = await queue.add('Hello, World!'); + t.pass('There is no error when adding a message.') + t.ok(id, 'Received an id for this message') + origId = id + }, + async function() { + var id = await queue.add('Part II'); + t.pass('There is no error when adding another message.') + t.ok(id, 'Received an id for this message') + origId2 = id + }, + async function() { + var thisMsg = await queue.get(); + t.equal(thisMsg.id, origId, 'We return the first message on first go') + await new Promise(function(resolve) { + setTimeout(function() { + t.pass('First expiration') + resolve() + }, 2 * 1000); + }); + }, + async function() { + var thisMsg = await queue.get(); + t.equal(thisMsg.id, origId, 'We return the first message on second go') + await new Promise(function(resolve) { + setTimeout(function() { + t.pass('Second expiration') + resolve() + }, 2 * 1000); + }); + }, + async function() { + var thisMsg = await queue.get(); + t.equal(thisMsg.id, origId, 'We return the first message on third go') + await new Promise(function(resolve) { + setTimeout(function() { + t.pass('Third expiration') + resolve() + }, 2 * 1000); + }); + }, + async function() { + // This is the 4th time, so we SHOULD have moved it to the dead queue + // pior to it being returned. + msg = await queue.get(); + t.pass('No error when getting the 2nd message') + t.equal(msg.id, origId2, 'Got the ID of the 2nd message') + t.equal(msg.payload, 'Part II', 'Got the same payload as the 2nd message') + }, + async function() { + msg = await deadQueue.get(); + t.pass('No error when getting from the deadQueue') + t.ok(msg.id, 'Got a message id from the deadQueue') + t.equal(msg.payload.id, origId, 'Got the same message id as the original message') + t.equal(msg.payload.payload, 'Hello, World!', 'Got the same as the original message') + t.equal(msg.payload.tries, 4, 'Got the tries as 4') + }, + ], + function(err) { + t.ok(!err, 'No error during single round-trip test') + t.end() + } + ) + }) + + test('client.close()', function(t) { + t.pass('client.close()') + client.close() + t.end() + }) + +}) diff --git a/test/default-async.js b/test/default-async.js new file mode 100644 index 0000000..b8aa6dd --- /dev/null +++ b/test/default-async.js @@ -0,0 +1,103 @@ +var async = require('async') +var test = require('tape') + +var setup = require('./setup.js') +var mongoDbQueue = require('../') + +setup(function(client, db) { + + test('first test', function(t) { + var queue = mongoDbQueue(db, 'default') + t.ok(queue, 'Queue created ok') + t.end() + }); + + test('single round trip', function(t) { + var queue = mongoDbQueue(db, 'default') + var msg + + async.series( + [ + async function() { + var id = await queue.add('Hello, World!'); + t.pass('There is no error when adding a message.') + t.ok(id, 'Received an id for this message') + }, + async function() { + var thisMsg = await queue.get(); + console.log(thisMsg) + msg = thisMsg + t.ok(msg.id, 'Got a msg.id') + t.equal(typeof msg.id, 'string', 'msg.id is a string') + t.ok(msg.ack, 'Got a msg.ack') + t.equal(typeof msg.ack, 'string', 'msg.ack is a string') + t.ok(msg.tries, 'Got a msg.tries') + t.equal(typeof msg.tries, 'number', 'msg.tries is a number') + t.equal(msg.tries, 1, 'msg.tries is currently one') + t.equal(msg.payload, 'Hello, World!', 'Payload is correct') + }, + async function() { + var id = await queue.ack(msg.ack); + t.pass('No error when acking the message') + t.ok(id, 'Received an id when acking this message') + }, + ], + function(err) { + t.ok(!err, 'No error during single round-trip test') + t.end() + } + ) + }) + + test("single round trip, can't be acked again", function(t) { + var queue = mongoDbQueue(db, 'default') + var msg + + async.series( + [ + async function() { + var id = await queue.add('Hello, World!'); + t.pass('There is no error when adding a message.') + t.ok(id, 'Received an id for this message') + }, + async function() { + var thisMsg = await queue.get(); + msg = thisMsg + t.ok(msg.id, 'Got a msg.id') + t.equal(typeof msg.id, 'string', 'msg.id is a string') + t.ok(msg.ack, 'Got a msg.ack') + t.equal(typeof msg.ack, 'string', 'msg.ack is a string') + t.ok(msg.tries, 'Got a msg.tries') + t.equal(typeof msg.tries, 'number', 'msg.tries is a number') + t.equal(msg.tries, 1, 'msg.tries is currently one') + t.equal(msg.payload, 'Hello, World!', 'Payload is correct') + }, + async function() { + var id = await queue.ack(msg.ack); + t.pass('No error when acking the message') + t.ok(id, 'Received an id when acking this message') + }, + async function() { + try { + var id = await queue.ack(msg.ack); + t.notOk(id, 'Should not be here'); + } catch (err) { + t.ok(err, 'There is an error when acking the message again') + t.ok(!id, 'No id received when trying to ack an already deleted message') + } + }, + ], + function(err) { + t.ok(!err, 'No error during single round-trip when trying to double ack') + t.end() + } + ) + }) + + test('client.close()', function(t) { + t.pass('client.close()') + client.close() + t.end() + }) + +}) diff --git a/test/delay-async.js b/test/delay-async.js new file mode 100644 index 0000000..4046316 --- /dev/null +++ b/test/delay-async.js @@ -0,0 +1,103 @@ +var async = require('async') +var test = require('tape') + +var setup = require('./setup.js') +var mongoDbQueue = require('../') + +setup(function(client, db) { + + test('delay: check messages on this queue are returned after the delay', function(t) { + var queue = mongoDbQueue(db, 'delay', { delay : 3 }) + + async.series( + [ + async function() { + var id = await queue.add('Hello, World!'); + t.pass('There is no error when adding a message.') + t.ok(id, 'There is an id returned when adding a message.') + }, + async function() { + // get something now and it shouldn't be there + var msg = await queue.get(); + t.pass('No error when getting no messages') + t.ok(!msg, 'No msg received') + // now wait 4s + await new Promise(function(resolve) { + setTimeout(function() { + resolve() + }, 4 * 1000); + }); + }, + async function() { + // get something now and it SHOULD be there + var msg = await queue.get(); + t.pass('No error when getting a message') + t.ok(msg.id, 'Got a message id now that the message delay has passed') + await queue.ack(msg.ack) + }, + async function() { + var msg = await queue.get(); + // no more messages + t.pass('No error when getting no messages') + t.ok(!msg, 'No more messages') + }, + ], + function(err) { + if (err) t.fail(err) + t.pass('Finished test ok') + t.end() + } + ) + }) + + test('delay: check an individual message delay overrides the queue delay', function(t) { + var queue = mongoDbQueue(db, 'delay') + + async.series( + [ + async function() { + var id = await queue.add('I am delayed by 3 seconds', { delay : 3 }); + t.pass('There is no error when adding a message.') + t.ok(id, 'There is an id returned when adding a message.') + }, + async function() { + // get something now and it shouldn't be there + var msg = await queue.get(); + t.pass('No error when getting no messages') + t.ok(!msg, 'No msg received') + // now wait 4s + await new Promise(function(resolve) { + setTimeout(function() { + resolve() + }, 4 * 1000); + }); + }, + async function() { + // get something now and it SHOULD be there + var msg = await queue.get(); + t.pass('No error when getting a message') + t.ok(msg.id, 'Got a message id now that the message delay has passed') + await queue.ack(msg.ack) + }, + async function() { + var msg = await queue.get(); + // no more messages + t.pass('No error when getting no messages') + t.ok(!msg, 'No more messages') + }, + ], + function(err) { + if (err) t.fail(err) + t.pass('Finished test ok') + t.end() + } + ) + }) + + test('client.close()', function(t) { + t.pass('client.close()') + client.close() + t.end() + }) + +}) diff --git a/test/indexes-async.js b/test/indexes-async.js new file mode 100644 index 0000000..8a136f2 --- /dev/null +++ b/test/indexes-async.js @@ -0,0 +1,27 @@ +var async = require('async') +var test = require('tape') + +var setup = require('./setup.js') +var mongoDbQueue = require('../') + +setup(function(client, db) { + + test('visibility: check message is back in queue after 3s', async function(t) { + t.plan(2) + + var queue = mongoDbQueue(db, 'visibility', { visibility : 3 }) + + var indexName = await queue.createIndexes(); + t.pass('There was no error when running .ensureIndexes()') + console.log(indexName); + t.ok(indexName, 'receive indexName we created') + t.end() + }) + + test('client.close()', function(t) { + t.pass('client.close()') + client.close() + t.end() + }) + +}) diff --git a/test/many-async.js b/test/many-async.js new file mode 100644 index 0000000..721d568 --- /dev/null +++ b/test/many-async.js @@ -0,0 +1,74 @@ +var async = require('async') +var test = require('tape') + +var setup = require('./setup.js') +var mongoDbQueue = require('../') + +var total = 250 + +setup(function(client, db) { + + test('many: add ' + total + ' messages, get ' + total + ' back', function(t) { + var queue = mongoDbQueue(db, 'many') + var msgs = [] + var msgsToQueue = [] + + async.series( + [ + async function() { + var i + for(i=0; i Date: Fri, 12 Feb 2021 02:30:25 -0500 Subject: [PATCH 2/3] Adding support for promises --- mongodb-queue.js | 361 ++++++++++++++++++++++++++++------------------- package.json | 2 +- 2 files changed, 213 insertions(+), 150 deletions(-) diff --git a/mongodb-queue.js b/mongodb-queue.js index a712f1c..bfc0ada 100644 --- a/mongodb-queue.js +++ b/mongodb-queue.js @@ -51,80 +51,104 @@ function Queue(db, name, opts) { } } -Queue.prototype.createIndexes = function(callback) { - var self = this - - self.col.createIndex({ deleted : 1, visible : 1 }, function(err, indexname) { - if (err) return callback(err) - self.col.createIndex({ ack : 1 }, { unique : true, sparse : true }, function(err) { - if (err) return callback(err) - callback(null, indexname) - }) - }) +Queue.prototype.createIndexes = async function(callback) { + try { + var self = this + + var [indexname] = await Promise.all([ + self.col.createIndex({ deleted : 1, visible : 1 }), + self.col.createIndex({ ack : 1 }, { unique : true, sparse : true }) + ]); + + if(callback) return callback(null, indexname);; + return indexname; + } catch (err) { + if(callback) return callback(err); + throw err; + } } -Queue.prototype.add = function(payload, opts, callback) { - var self = this - if ( !callback ) { - callback = opts - opts = {} - } - var delay = opts.delay || self.delay - var visible = delay ? nowPlusSecs(delay) : now() - - var msgs = [] - if (payload instanceof Array) { - if (payload.length === 0) { - var errMsg = 'Queue.add(): Array payload length must be greater than 0' - return callback(new Error(errMsg)) +Queue.prototype.add = async function(payload, opts, callback) { + try { + var self = this + if ( !callback && typeof opts === 'function') { + callback = opts + opts = {} } - payload.forEach(function(payload) { + if (!opts) { + opts = {}; + } + var delay = opts.delay || self.delay + var visible = delay ? nowPlusSecs(delay) : now() + + var msgs = [] + if (payload instanceof Array) { + if (payload.length === 0) { + var errMsg = 'Queue.add(): Array payload length must be greater than 0' + if (callback) { + return callback(new Error(errMsg)) + } + throw new Error(errMsg); + } + payload.forEach(function(payload) { + msgs.push({ + visible : visible, + payload : payload, + }) + }) + } else { msgs.push({ visible : visible, payload : payload, }) - }) - } else { - msgs.push({ - visible : visible, - payload : payload, - }) - } + } - self.col.insertMany(msgs, function(err, results) { - if (err) return callback(err) - if (payload instanceof Array) return callback(null, '' + results.insertedIds) - callback(null, '' + results.ops[0]._id) - }) + var results = await self.col.insertMany(msgs); + if (callback) { + if (payload instanceof Array) return callback(null, '' + results.insertedIds) + return callback(null, '' + results.ops[0]._id) + } + if (payload instanceof Array) return '' + results.insertedIds; + return '' + results.ops[0]._id; + } catch (err) { + if(callback) return callback(err); + throw err; + } } -Queue.prototype.get = function(opts, callback) { - var self = this - if ( !callback ) { - callback = opts - opts = {} - } +Queue.prototype.get = async function(opts, callback) { + try { + var self = this + if ( !callback && typeof opts === 'function') { + callback = opts + opts = {} + } + if (!opts) { + opts = {}; + } - var visibility = opts.visibility || self.visibility - var query = { - deleted : null, - visible : { $lte : now() }, - } - var sort = { - _id : 1 - } - var update = { - $inc : { tries : 1 }, - $set : { - ack : id(), - visible : nowPlusSecs(visibility), + var visibility = opts.visibility || self.visibility + var query = { + deleted : null, + visible : { $lte : now() }, + } + var sort = { + _id : 1 + } + var update = { + $inc : { tries : 1 }, + $set : { + ack : id(), + visible : nowPlusSecs(visibility), + } } - } - self.col.findOneAndUpdate(query, update, { sort: sort, returnOriginal : false }, function(err, result) { - if (err) return callback(err) + var result = await self.col.findOneAndUpdate(query, update, { sort: sort, returnOriginal : false }); var msg = result.value - if (!msg) return callback() + if (!msg) { + if(callback) return callback(); + return; + } // convert to an external representation msg = { @@ -142,127 +166,166 @@ Queue.prototype.get = function(opts, callback) { // 1) add this message to the deadQueue // 2) ack this message from the regular queue // 3) call ourself to return a new message (if exists) - self.deadQueue.add(msg, function(err) { - if (err) return callback(err) - self.ack(msg.ack, function(err) { - if (err) return callback(err) - self.get(callback) - }) - }) - return + await self.deadQueue.add(msg); + await self.ack(msg.ack); + msg = await self.get(); + if (callback) return callback(null, msg); + return msg; } } - callback(null, msg) - }) + if(callback) return callback(null, msg) + return msg; + } catch (err) { + if(callback) return callback(err); + throw err; + } } -Queue.prototype.ping = function(ack, opts, callback) { - var self = this - if ( !callback ) { - callback = opts - opts = {} - } +Queue.prototype.ping = async function(ack, opts, callback) { + try { + var self = this + if ( !callback && typeof opts === 'function') { + callback = opts + opts = {} + } + if (!opts) { + opts = {}; + } - var visibility = opts.visibility || self.visibility - var query = { - ack : ack, - visible : { $gt : now() }, - deleted : null, - } - var update = { - $set : { - visible : nowPlusSecs(visibility) + var visibility = opts.visibility || self.visibility + var query = { + ack : ack, + visible : { $gt : now() }, + deleted : null, } - } - self.col.findOneAndUpdate(query, update, { returnOriginal : false }, function(err, msg, blah) { - if (err) return callback(err) + var update = { + $set : { + visible : nowPlusSecs(visibility) + } + } + var msg = await self.col.findOneAndUpdate(query, update, { returnOriginal : false }); if ( !msg.value ) { - return callback(new Error("Queue.ping(): Unidentified ack : " + ack)) + if (callback) return callback(new Error("Queue.ping(): Unidentified ack : " + ack)); + throw new Error("Queue.ping(): Unidentified ack : " + ack); + } + if (callback) return callback(null, '' + msg.value._id); + return '' + msg.value._id; + } catch (err) { + if(callback) { + return callback(err); } - callback(null, '' + msg.value._id) - }) + throw err; + } } -Queue.prototype.ack = function(ack, callback) { - var self = this +Queue.prototype.ack = async function(ack, callback) { + try { + var self = this - var query = { - ack : ack, - visible : { $gt : now() }, - deleted : null, - } - var update = { - $set : { - deleted : now(), + var query = { + ack : ack, + visible : { $gt : now() }, + deleted : null, } - } - self.col.findOneAndUpdate(query, update, { returnOriginal : false }, function(err, msg, blah) { - if (err) return callback(err) + var update = { + $set : { + deleted : now(), + } + } + var msg = await self.col.findOneAndUpdate(query, update, { returnOriginal : false }); if ( !msg.value ) { - return callback(new Error("Queue.ack(): Unidentified ack : " + ack)) + if (callback) return callback(new Error("Queue.ack(): Unidentified ack : " + ack)); + throw new Error("Queue.ack(): Unidentified ack : " + ack); } - callback(null, '' + msg.value._id) - }) + if (callback) return callback(null, '' + msg.value._id); + return '' + msg.value._id; + } catch(err) { + if(callback) callback(err); + throw err; + } } -Queue.prototype.clean = function(callback) { - var self = this +Queue.prototype.clean = async function(callback) { + try { + var self = this - var query = { - deleted : { $exists : true }, - } + var query = { + deleted : { $exists : true }, + } - self.col.deleteMany(query, callback) + var result = await self.col.deleteMany(query) + if(callback) return callback(null, result); + return result; + } catch(err) { + if(callback) callback(err); + throw err; + } } -Queue.prototype.total = function(callback) { - var self = this +Queue.prototype.total = async function(callback) { + try { + var self = this - self.col.countDocuments(function(err, count) { - if (err) return callback(err) - callback(null, count) - }) + var count = await self.col.countDocuments(); + if (callback) return callback(null, count); + return count; + } catch(err) { + if(callback) callback(err); + throw err; + } } -Queue.prototype.size = function(callback) { - var self = this +Queue.prototype.size = async function(callback) { + try { + var self = this - var query = { - deleted : null, - visible : { $lte : now() }, - } + var query = { + deleted : null, + visible : { $lte : now() }, + } - self.col.countDocuments(query, function(err, count) { - if (err) return callback(err) - callback(null, count) - }) + var count = await self.col.countDocuments(query); + if (callback) return callback(null, count); + return count; + } catch(err) { + if(callback) callback(err); + throw err; + } } -Queue.prototype.inFlight = function(callback) { - var self = this +Queue.prototype.inFlight = async function(callback) { + try { + var self = this - var query = { - ack : { $exists : true }, - visible : { $gt : now() }, - deleted : null, - } + var query = { + ack : { $exists : true }, + visible : { $gt : now() }, + deleted : null, + } - self.col.countDocuments(query, function(err, count) { - if (err) return callback(err) - callback(null, count) - }) + var count = await self.col.countDocuments(query); + if (callback) return callback(null, count); + return count; + } catch(err) { + if(callback) callback(err); + throw err; + } } -Queue.prototype.done = function(callback) { - var self = this +Queue.prototype.done = async function(callback) { + try { + var self = this - var query = { - deleted : { $exists : true }, - } + var query = { + deleted : { $exists : true }, + } - self.col.countDocuments(query, function(err, count) { - if (err) return callback(err) - callback(null, count) - }) + var count = await self.col.countDocuments(query); + if (callback) return callback(null, count); + return count; + } catch(err) { + if(callback) callback(err); + throw err; + } } diff --git a/package.json b/package.json index d2b77e9..238632f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mongodb-queue", - "version": "4.0.0", + "version": "5.0.0", "description": "Message queues which uses MongoDB.", "main": "mongodb-queue.js", "scripts": { From e49aede6c3182d83257e0a56ff44b65ccb569d61 Mon Sep 17 00:00:00 2001 From: Scott Saltsgaver Date: Fri, 12 Feb 2021 02:35:14 -0500 Subject: [PATCH 3/3] Adding support for promises --- README.md | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/README.md b/README.md index 57a7206..f326855 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,14 @@ queue.add('Hello, World!', (err, id) => { }) ``` +Using Promises + +```js +// Message with payload 'Hello, World!' added. +// 'id' is returned, useful for logging. +var id = await queue.add('Hello, World!'); +``` + Get a message from the queue: ```js @@ -53,6 +61,16 @@ queue.get((err, msg) => { }) ``` +Using Promises + +```js +var msg = await queue.get(); +console.log('msg.id=' + msg.id) +console.log('msg.ack=' + msg.ack) +console.log('msg.payload=' + msg.payload) // 'Hello, World!' +console.log('msg.tries=' + msg.tries) +``` + Ping a message to keep it's visibility open for long-running tasks ```js @@ -62,6 +80,14 @@ queue.ping(msg.ack, (err, id) => { }) ``` +Using Promises + +```js +// Visibility window now increased for this message id. +// 'id' is returned, useful for logging. +var id = await queue.ping(msg.ack); +``` + Ack a message (and remove it from the queue): ```js @@ -71,6 +97,14 @@ queue.ack(msg.ack, (err, id) => { }) ``` +Using Promises + +```js +// This msg removed from queue for this ack. +// The 'id' of the message is returned, useful for logging. +var id = await queue.ack(msg.ack); +``` + By default, all old messages - even processed ones - are left in MongoDB. This is so that you can go and analyse them if you want. However, you can call the following function to remove processed messages: @@ -81,6 +115,14 @@ queue.clean((err) => { }) ``` +Using Promises + +```js +// All processed (ie. acked) messages have been deleted +await queue.clean(); +}) +``` + And if you haven't already, you should call this to make sure indexes have been added in MongoDB. Of course, if you've called this once (in some kind one-off script) you don't need to call it in your program. Of course, check @@ -92,6 +134,13 @@ queue.createIndexes((err, indexName) => { }) ``` +Using Promises + +```js +// The indexes needed have been added to MongoDB. +var indexName = await queue.createIndexes(); +``` + ## Creating a Queue ## To create a queue, call the exported function with the `MongoClient`, the name