diff --git a/lib/internal/queue.js b/lib/internal/queue.js index 678cee1ba..e57ef14a5 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -59,6 +59,10 @@ export default function queue(worker, concurrency, payload) { }); task.callback.apply(task, args); + + if (args[0] != null) { + q.error(args[0], task.data); + } }); if (workers <= (q.concurrency - q.buffer) ) { @@ -83,6 +87,7 @@ export default function queue(worker, concurrency, payload) { buffer: concurrency / 4, empty: noop, drain: noop, + error: noop, started: false, paused: false, push: function (data, callback) { diff --git a/lib/queue.js b/lib/queue.js index 24b187e2f..ac1987af4 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -34,6 +34,8 @@ import queue from './internal/queue'; * from the `queue` is given to a `worker`. * @property {Function} drain - a callback that is called when the last item * from the `queue` has returned from the `worker`. + * @property {Function} error - a callback that is called when a task errors. + * Has the signature `function(error, task)`. * @property {boolean} paused - a boolean for determining whether the queue is * in a paused state. * @property {Function} pause - a function that pauses the processing of tasks diff --git a/mocha_test/queue.js b/mocha_test/queue.js index de21ef4df..8ba577066 100644 --- a/mocha_test/queue.js +++ b/mocha_test/queue.js @@ -155,6 +155,33 @@ describe('queue', function(){ }); }); + it('global error handler', function(done){ + var results = []; + + var q = async.queue(function (task, callback) { + callback(task.name === 'foo' ? new Error('fooError') : null); + }, 2); + + q.error = function(error, task) { + expect(error).to.exist; + expect(error.message).to.equal('fooError'); + expect(task.name).to.equal('foo'); + results.push('fooError'); + }; + + q.drain = function() { + expect(results).to.eql(['fooError', 'bar']); + done(); + }; + + q.push({name: 'foo'}); + + q.push({name: 'bar'}, function(error) { + expect(error).to.not.exist; + results.push('bar'); + }); + }); + // The original queue implementation allowed the concurrency to be changed only // on the same event loop during which a task was added to the queue. This // test attempts to be a more robust test.