Skip to content

Commit

Permalink
Fixed issue where closed broker could be used to send metadata request
Browse files Browse the repository at this point in the history
…closes #995 (#1160)
  • Loading branch information
hyperlink authored Jan 7, 2019
1 parent 2340ca2 commit 69ef92b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 14 deletions.
32 changes: 18 additions & 14 deletions lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ KafkaClient.prototype.loadMetadata = function (callback) {
* @param {loadMetadataCallback} callback Function to call once metadata is loaded.
*/
KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) {
var broker = this.brokerForLeader();
const broker = this.brokerForLeader();

if (!broker || !broker.isConnected()) {
return callback(new errors.BrokerNotAvailableError('Broker not available'));
Expand All @@ -870,21 +870,25 @@ KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) {
}
};

async.series([
cb => {
ensureBrokerReady(broker, cb);
},
cb => {
var correlationId = this.nextId();
var supportedCoders = getSupportedForRequestType(broker, 'metadata');
var request = supportedCoders.encoder(this.clientId, correlationId, topics);
async.series(
[
cb => {
ensureBrokerReady(broker, cb);
},
cb => {
const broker = this.brokerForLeader();
const correlationId = this.nextId();
const supportedCoders = getSupportedForRequestType(broker, 'metadata');
const request = supportedCoders.encoder(this.clientId, correlationId, topics);

this.queueCallback(broker.socket, correlationId, [supportedCoders.decoder, cb]);
broker.write(request);
this.queueCallback(broker.socket, correlationId, [supportedCoders.decoder, cb]);
broker.write(request);
}
],
(err, result) => {
callback(err, result[1]);
}
], (err, result) => {
callback(err, result[1]);
});
);
};

/**
Expand Down
43 changes: 43 additions & 0 deletions test/test.kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,49 @@ describe('Kafka Client', function () {
});
});

describe('#loadMetadataForTopics', function () {
it('should request metadata from correct broker after ready', function (done) {
const client = new Client({ autoConnect: false });
const brokerAddr = uuid.v4();

const brokerForLeaderStub = sinon.stub(client, 'brokerForLeader');
sinon.spy(client, 'waitUntilReady');

const firstBroker = new BrokerWrapper(new FakeSocket());
const secondBroker = new BrokerWrapper(new FakeSocket());

firstBroker.socket.addr = brokerAddr;
secondBroker.socket.addr = brokerAddr;

brokerForLeaderStub.onFirstCall().returns(firstBroker);
brokerForLeaderStub.onSecondCall().returns(secondBroker);

client.connecting = true;
client.loadMetadataForTopics([], function (error, result) {
if (error) {
return done(error);
}
sinon.assert.calledTwice(brokerForLeaderStub);
done(null);
});

firstBroker.socket.destroyed = true;
secondBroker.apiSupport = {
metadata: {
usable: 0
}
};

sinon.stub(client, 'queueCallback').callsFake(function (socket, correlationId, coderAndCb) {
setImmediate(function () {
coderAndCb[1](null);
});
});

client.emit(firstBroker.getReadyEventName());
});
});

describe('#sendControllerRequest', function () {
let client, sandbox;

Expand Down

0 comments on commit 69ef92b

Please # to comment.