Skip to content

Commit

Permalink
Merge pull request #1170 from jnordberg/queue-error
Browse files Browse the repository at this point in the history
Add optional error handler to queues
  • Loading branch information
aearly committed May 31, 2016
2 parents f97c154 + 6c3a2ef commit 4c78fc2
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 0 deletions.
5 changes: 5 additions & 0 deletions lib/internal/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ export default function queue(worker, concurrency, payload) {
});

task.callback.apply(task, args);

if (args[0] != null) {
q.error(args[0], task.data);
}
});

if (workers <= (q.concurrency - q.buffer) ) {
Expand All @@ -83,6 +87,7 @@ export default function queue(worker, concurrency, payload) {
buffer: concurrency / 4,
empty: noop,
drain: noop,
error: noop,
started: false,
paused: false,
push: function (data, callback) {
Expand Down
2 changes: 2 additions & 0 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import queue from './internal/queue';
* from the `queue` is given to a `worker`.
* @property {Function} drain - a callback that is called when the last item
* from the `queue` has returned from the `worker`.
* @property {Function} error - a callback that is called when a task errors.
* Has the signature `function(error, task)`.
* @property {boolean} paused - a boolean for determining whether the queue is
* in a paused state.
* @property {Function} pause - a function that pauses the processing of tasks
Expand Down
27 changes: 27 additions & 0 deletions mocha_test/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,33 @@ describe('queue', function(){
});
});

it('global error handler', function(done){
var results = [];

var q = async.queue(function (task, callback) {
callback(task.name === 'foo' ? new Error('fooError') : null);
}, 2);

q.error = function(error, task) {
expect(error).to.exist;
expect(error.message).to.equal('fooError');
expect(task.name).to.equal('foo');
results.push('fooError');
};

q.drain = function() {
expect(results).to.eql(['fooError', 'bar']);
done();
};

q.push({name: 'foo'});

q.push({name: 'bar'}, function(error) {
expect(error).to.not.exist;
results.push('bar');
});
});

// The original queue implementation allowed the concurrency to be changed only
// on the same event loop during which a task was added to the queue. This
// test attempts to be a more robust test.
Expand Down

0 comments on commit 4c78fc2

Please # to comment.