Skip to content

Commit

Permalink
Merge pull request #187 from haio/patch
Browse files Browse the repository at this point in the history
Producer support `requireAcks` option
  • Loading branch information
haio committed Apr 1, 2015
2 parents 7101c4e + 8419b9d commit 0271c8f
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 22 deletions.
14 changes: 9 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# kafka-node CHANGELOG

## 2015-04-01, Version 0.2.25
- Producer support `requireAcks` option [#187](https://github.com/SOHU-Co/kafka-node/pull/187)
- Update examples [#185](https://github.com/SOHU-Co/kafka-node/pull/185)

## 2015-03-20, Version 0.2.24
- Bump deps
- Refresh metadata after auto rebalance among brokers #180
- Initialize partition owner with consumerId #178
- Refresh metadata after auto rebalance among brokers [#180](https://github.com/SOHU-Co/kafka-node/pull/180)
- Initialize partition owner with consumerId [#178](https://github.com/SOHU-Co/kafka-node/pull/178)

## 2015-03-17, Version 0.2.23
- Fix #175: Refresh topic metadata in Producer when broker change
- Fix [#175](https://github.com/SOHU-Co/kafka-node/issues/175): Refresh topic metadata in Producer when broker change
- Refactor Client#refreshMetadata method
- Add the missing semicolons, no offense, just keep style.
- Fix #170: In case of `offsetOutOfRange`, the consumer should be paused.
- Fix #169: When paused why try to fetch every 1000 ms?
- Fix [#170](https://github.com/SOHU-Co/kafka-node/issues/170): In case of `offsetOutOfRange`, the consumer should be paused.
- Fix [#169](https://github.com/SOHU-Co/kafka-node/issues/169): When paused why try to fetch every 1000 ms?
- Ref: remove unused variables.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ Closes the connection to Zookeeper and the brokers so that the node process can
* `cb`: **Function**, the callback

## Producer
### Producer(client)
### Producer(client, [options])
* `client`: client which keeps a connection with the Kafka server.
* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}`

``` js
var kafka = require('kafka-node'),
Expand Down Expand Up @@ -104,8 +105,9 @@ producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg


## HighLevelProducer
### HighLevelProducer(client)
### HighLevelProducer(client, [options])
* `client`: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions
* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}`

``` js
var kafka = require('kafka-node'),
Expand Down
2 changes: 1 addition & 1 deletion example/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ var argv = require('optimist').argv;
var topic = argv.topic || 'topic1';
var p = argv.p || 0;
var a = argv.a || 0;
var producer = new Producer(client);
var producer = new Producer(client, { requireAcks: 1 });

producer.on('ready', function () {
var message = 'a message';
Expand Down
20 changes: 14 additions & 6 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,11 @@ Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs
};

Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) {
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs),
decoder = protocol.decodeProduceResponse,
self = this;
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs);
var decoder = protocol.decodeProduceResponse;
var self = this;

decoder.requireAcks = requireAcks;

async.each(payloads, buildRequest, function (err) {
if (err) return cb(err);
Expand Down Expand Up @@ -221,7 +223,7 @@ Client.prototype.loadMetadataForTopics = function (topics, cb) {
}

this.queueCallback(broker, correlationId, [protocol.decodeMetadataResponse, cb]);
broker && broker.write(request);
broker.write(request);
};

Client.prototype.createTopics = function (topics, isAsync, cb) {
Expand Down Expand Up @@ -415,8 +417,14 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
if (broker.waitting) continue;
broker.waitting = true;
}
this.queueCallback(broker, correlationId, [decoder, cb]);
broker && broker.write(request);

if (decoder.requireAcks == 0) {
broker.write(request);
cb(null, { result: 'no ack' });
} else {
this.queueCallback(broker, correlationId, [decoder, cb]);
broker.write(request);
}
}
};

Expand Down
10 changes: 7 additions & 3 deletions lib/highLevelProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ var util = require('util'),
* @constructor
*/
var HighLevelProducer = function (client, options) {
var useOptions = options || {};
options = options || {};

this.ready = false;
this.client = client;

this.requireAcks = useOptions.requireAcks || DEFAULTS.requireAcks
this.ackTimeoutMs = useOptions.ackTimeoutMs || DEFAULTS.ackTimeoutMs
this.requireAcks = options.requireAcks === undefined
? DEFAULTS.requireAcks
: options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;

this.connect();
};
Expand Down
10 changes: 7 additions & 3 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ var util = require('util'),
* @constructor
*/
var Producer = function (client, options) {
var useOptions = options || {};
options = options || {};

this.ready = false;
this.client = client;

this.requireAcks = useOptions.requireAcks || DEFAULTS.requireAcks
this.ackTimeoutMs = useOptions.ackTimeoutMs || DEFAULTS.ackTimeoutMs
this.requireAcks = options.requireAcks === undefined
? DEFAULTS.requireAcks
: options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;

this.connect();
};
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "kafka-node",
"description": "node client for Apache kafka, only support kafka 0.8 and above",
"version": "0.2.24",
"version": "0.2.25",
"main": "kafka.js",
"dependencies": {
"async": "~0.9.0",
Expand Down
13 changes: 12 additions & 1 deletion test/test.producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var kafka = require('..'),
Client = kafka.Client,
KeyedMessage = kafka.KeyedMessage;

var client, producer;
var client, producer, noAckProducer;

var TOPIC_POSTFIX = '_test_' + Date.now();
var EXISTS_TOPIC_3 = '_exists_3' + TOPIC_POSTFIX;
Expand All @@ -26,6 +26,7 @@ before(function (done) {
setTimeout(done, 500);
});
});
noAckProducer = new Producer(client, { requireAcks: 0 });
});

describe('Producer', function () {
Expand Down Expand Up @@ -116,6 +117,16 @@ describe('Producer', function () {
done();
});
});

it('should send message without ack', function (done) {
noAckProducer.send([{
topic: EXISTS_TOPIC_3, messages: 'hello kafka'
}], function (err, message) {
if (err) return done(err);
message.result.should.equal('no ack');
done();
});
})
});

describe('#createTopics', function () {
Expand Down

0 comments on commit 0271c8f

Please # to comment.