Skip to content

Commit

Permalink
Refresh metadata after auto rebalance among brokers
Browse files Browse the repository at this point in the history
This should fix issue SOHU-Co#175. When the client get `NotLeaderForPartition`
error, that means the leader for the partition has changed, so it emit a
`brokersChanged` event, consumer and producer listen this event and
refresh topic metadata.
  • Loading branch information
haio committed Mar 19, 2015
1 parent 541faa5 commit 704ae54
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 19 deletions.
29 changes: 22 additions & 7 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,14 @@ Client.prototype.closeBrokers = function (brokers) {
};

Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes),
decoder = protocol.decodeFetchResponse(function (err, type, message) {
var self = this;
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
var decoder = protocol.decodeFetchResponse(function (err, type, message) {
if (err) {
if (err.message === 'OffsetOutOfRange') {
return consumer.emit('offsetOutOfRange', err);
} else if (err.message === 'NotLeaderForPartition') {
return self.emit('brokersChanged');
}

return consumer.emit('error', err);
Expand Down Expand Up @@ -136,7 +139,16 @@ Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeou

async.each(payloads, buildRequest, function (err) {
if (err) return cb(err);
self.send(payloads, encoder, decoder, cb);
self.send(payloads, encoder, decoder, function (err, result) {
if (err) {
if (err.message === 'NotLeaderForPartition') {
self.emit('brokersChanged');
}
cb(err);
} else {
cb(null, result);
}
});
});

function buildRequest (payload, cb) {
Expand Down Expand Up @@ -343,7 +355,7 @@ Client.prototype.refreshMetadata = function (topicNames, cb) {
return;
}
if (err) {
debug('refresh metadta error', err.message)
debug('refresh metadata error', err.message)
return cb(err);
}
self.updateMetadatas(resp);
Expand Down Expand Up @@ -535,9 +547,12 @@ Client.prototype.handleReceivedData = function (socket) {
var handlers = this.unqueueCallback(socket, correlationId);

if (!handlers) return;
var decoder = handlers[0],
cb = handlers[1];
cb.call(this, null, decoder(resp));
var decoder = handlers[0];
var cb = handlers[1];
var result = decoder(resp);
(result instanceof Error)
? cb.call(this, result)
: cb.call(this, null, result);
socket.buffer = socket.buffer.slice(size);
if (socket.longpolling) socket.waitting = false;
} else { return }
Expand Down
14 changes: 5 additions & 9 deletions lib/highLevelConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ HighLevelConsumer.prototype.connect = function () {
});

// Wait for the consumer to be ready
this.on('ready', function () {
this.on('rebalanced', function () {
self.fetchOffset(self.topicPayloads, function (err, topics) {
if (err) {
return self.emit('error', err);
Expand All @@ -114,7 +114,6 @@ HighLevelConsumer.prototype.connect = function () {
self.ready = true;
self.updateOffsets(topics, true);
self.fetch();
self.emit('rebalanced');
});
});

Expand Down Expand Up @@ -149,9 +148,7 @@ HighLevelConsumer.prototype.connect = function () {

self.rebalancing = true;
// Nasty hack to retry 3 times to re-balance - TBD fix this
var oldTopicPayloads = JSON.parse(JSON.stringify(self.topicPayloads));
self.topicPayloads = [];

var oldTopicPayloads = self.topicPayloads;
var operation = retry.operation({
retries: 10,
factor: 2,
Expand Down Expand Up @@ -179,7 +176,7 @@ HighLevelConsumer.prototype.connect = function () {
if (err) {
self.emit('error', err);
} else {
self.emit('ready');
self.emit('rebalanced');
}
});
}
Expand All @@ -197,14 +194,14 @@ HighLevelConsumer.prototype.connect = function () {
debug("Registered listeners");
// Register for re-balances (broker or consumer changes)
self.client.zk.on('consumersChanged', rebalance);
self.client.zk.on('brokersChanged', rebalance);
self.client.on('brokersChanged', rebalance);
}

function deregister() {
debug("Deregistered listeners");
// Register for re-balances (broker or consumer changes)
self.client.zk.removeListener('consumersChanged', rebalance);
self.client.zk.removeListener('brokersChanged', rebalance);
self.client.removeListener('brokersChanged', rebalance);
}

this.client.zk.on('error', function (err) {
Expand Down Expand Up @@ -567,7 +564,6 @@ HighLevelConsumer.prototype.close = function (force, cb) {
};

HighLevelConsumer.prototype.stop = function (cb) {
this.ready = false;
if (!this.options.autoCommit) return cb && cb();
this.commit(true, function (err) {
cb && cb();
Expand Down
11 changes: 8 additions & 3 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ function _decodeFetchResponse(resp, cb, maxTickMessages) {
.tap(function (vars) {
this.buffer('messageSet', vars.messageSetSize)
if (vars.errorCode !== 0)
cb({ topic: vars.topic, partition: vars.partition, message: ERROR_CODE[vars.errorCode] });
return cb({ topic: vars.topic, partition: vars.partition, message: ERROR_CODE[vars.errorCode] });
var messageSet = decodeMessageSet(vars.topic, vars.partition, vars.messageSet, cb, maxTickMessages);
if (messageSet.length) {
var offset = messageSet[messageSet.length-1];
Expand Down Expand Up @@ -392,6 +392,7 @@ function encodeMessage(message) {

function decodeProduceResponse(resp) {
var topics = {};
var error;
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
Expand All @@ -405,10 +406,14 @@ function decodeProduceResponse(resp) {
.word16bs('errorCode')
.word64bs('offset')
.tap(function (vars) {
topics[vars.topic][vars.partition] = vars.offset;
if (vars.errorCode) {
error = new Error(ERROR_CODE[vars.errorCode]);
} else {
topics[vars.topic][vars.partition] = vars.offset;
}
});
}
return topics;
return error || topics;
}

function encodeOffsetFetchRequest(group) {
Expand Down

0 comments on commit 704ae54

Please # to comment.