From 34a6a652bd5ad1086c8324eaeb8fcfe2c08e8fbc Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Wed, 27 Mar 2019 11:13:11 -0400 Subject: [PATCH] Prevents attempts to reconnect with invalid brokers --- lib/kafkaClient.js | 15 +++++++++++++++ test/test.kafkaClient.js | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index cb1a5d1b..e15bb1db 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -815,6 +815,12 @@ KafkaClient.prototype.createBroker = function (host, port, longpolling) { self.deleteDisconnected(brokerWrapper); return; } + + if (!self.isValidBroker(s)) { + logger.debug(`${self.clientId} is not reconnecting to ${s.addr} invalid broker`); + return; + } + logger.debug(`${self.clientId} reconnecting to ${s.addr}`); self.reconnectBroker(s); }, 1000); @@ -822,6 +828,15 @@ KafkaClient.prototype.createBroker = function (host, port, longpolling) { return brokerWrapper; }; +KafkaClient.prototype.isValidBroker = function ({ host, port }) { + return ( + this.connecting || + _(this.brokerMetadata) + .values() + .some({ host, port }) + ); +}; + KafkaClient.prototype.deleteDisconnected = function (broker) { if (!broker.isConnected()) { const brokers = this.getBrokers(broker.socket.longpolling); diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index ee3a0827..b51be429 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -204,6 +204,38 @@ describe('Kafka Client', function () { sinon.assert.notCalled(client.emit); }); + it('should not reconnect when broker is no longer valid', function () { + sandbox.useFakeTimers(); + const client = new Client({ autoConnect: false }); + client.brokerMetadata = { + '1001': { + host: 'localhost', + port: 9092 + }, + '1002': { + host: 'kafkaServer', + port: 9092 + } + }; + sandbox.stub(client, 'reconnectBroker'); + client.createBroker('fakehost', 9092, true); + mockSocket.emit('end'); + sandbox.clock.tick(1000); + sinon.assert.notCalled(client.reconnectBroker); + }); + + it('should try reconnecting when client is initializing', function () { + sandbox.useFakeTimers(); + const client = new Client({ autoConnect: false }); + client.connecting = true; + client.brokerMetadata = {}; + sandbox.stub(client, 'reconnectBroker'); + client.createBroker('fakehost', 9092, true); + mockSocket.emit('end'); + sandbox.clock.tick(1000); + sinon.assert.calledOnce(client.reconnectBroker); + }); + it('should schedule refresh of metadata when socket is closed', function (done) { const client = new Client({ autoConnect: false }); sandbox.stub(client, 'refreshBrokerMetadata').callsFake(done);