Skip to content

Commit

Permalink
Fixes #554 argument is out of bounds exception (#556)
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink authored Jan 12, 2017
1 parent b9bef30 commit 9bcfb9d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
5 changes: 5 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ var debug = require('debug')('kafka-node:Client');
var validateConfig = require('./utils').validateConfig;
var validateKafkaTopics = require('./utils').validateTopicNames;

const MAX_INT32 = 2147483647;

/**
* Communicates with kafka brokers
* Uses zookeeper to discover all the kafka brokers to connect to
Expand Down Expand Up @@ -445,6 +447,9 @@ Client.prototype.addTopics = function (topics, cb) {
};

Client.prototype.nextId = function () {
if (this.correlationId >= MAX_INT32) {
this.correlationId = 0;
}
return this.correlationId++;
};

Expand Down
25 changes: 25 additions & 0 deletions test/test.consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,31 @@ describe('ConsumerGroup', function () {
});
});

describe('Long running fetches', function () {
let consumerGroup;

beforeEach(function (done) {
consumerGroup = new ConsumerGroup({
host: host,
groupId: 'longFetchSimulation'
}, 'TestTopic');
consumerGroup.once('connect', done);
});

afterEach(function (done) {
consumerGroup.close(done);
});

it('should not throw out of bounds', function (done) {
should.doesNotThrow(function () {
consumerGroup.pause();
consumerGroup.client.correlationId = 2147483647;
consumerGroup.resume();
setImmediate(done);
});
});
});

describe('Sending Heartbeats', function () {
var consumerGroup, sandbox;

Expand Down

0 comments on commit 9bcfb9d

Please # to comment.