Skip to content

Commit

Permalink
Fix topic partition check not being scheduled when loadMetadataForTop…
Browse files Browse the repository at this point in the history
…ics fails (SOHU-Co#1195)
  • Loading branch information
hyperlink authored and KeeReal committed Apr 11, 2019
1 parent 218e837 commit bb3f362
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
4 changes: 3 additions & 1 deletion lib/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,15 @@ function createTopicPartitionLength (metadata, emptyTopics) {
}

ConsumerGroup.prototype.scheduleTopicPartitionCheck = function () {
if (this.isLeader && !this.topicPartitionCheckTimer) {
if (this.isLeader && !this.topicPartitionCheckTimer && !this.closed) {
logger.debug(`${this.client.clientId} is leader scheduled new topic/partition check`);
this.topicPartitionCheckTimer = setTimeout(() => {
this.topicPartitionCheckTimer = null;
if (this.closed) return;
logger.debug('checking for new topics and partitions');
this._checkTopicPartitionChange((error, changed) => {
if (error) {
this.scheduleTopicPartitionCheck();
return this.emit('error', new NestedError('topic/partition change check failed', error));
}

Expand Down
56 changes: 56 additions & 0 deletions test/test.consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,62 @@ describe('ConsumerGroup', function () {
clock = sandbox.useFakeTimers();
});

it('should not schedule check if consumer is closed', function () {
const cgMock = sandbox.mock(consumerGroup);

consumerGroup.closed = true;
consumerGroup.isLeader = true;

cgMock.expects('_checkTopicPartitionChange').never();
cgMock.expects('commit').never();
cgMock.expects('leaveGroup').never();
cgMock.expects('connect').never();
consumerGroup.scheduleTopicPartitionCheck();
clock.tick(30000);

cgMock.verify();
});

it('should not run check if consumer is closed and scheduled', function () {
const cgMock = sandbox.mock(consumerGroup);

consumerGroup.closed = false;
consumerGroup.isLeader = true;

cgMock.expects('_checkTopicPartitionChange').never();
cgMock.expects('commit').never();
cgMock.expects('leaveGroup').never();
cgMock.expects('connect').never();
consumerGroup.scheduleTopicPartitionCheck();
consumerGroup.closed = true;
clock.tick(30000);

cgMock.verify();
});

it('should still schedule check if topicPartitionChange errors', function () {
const cgMock = sandbox.mock(consumerGroup);
consumerGroup.isLeader = true;
cgMock
.expects('_checkTopicPartitionChange')
.once()
.yields(new Error('something bad'));

cgMock.expects('commit').never();
cgMock.expects('leaveGroup').never();
cgMock.expects('connect').never();

consumerGroup.on('error', function () {});

consumerGroup.scheduleTopicPartitionCheck();
sandbox.spy(consumerGroup, 'scheduleTopicPartitionCheck');

clock.tick(30000);

cgMock.verify();
sinon.assert.calledOnce(consumerGroup.scheduleTopicPartitionCheck);
});

it('should only have one schedule pending', function () {
const cgMock = sandbox.mock(consumerGroup);
consumerGroup.isLeader = true;
Expand Down

0 comments on commit bb3f362

Please # to comment.