diff --git a/CHANGELOG.md b/CHANGELOG.md index 2178acbb..31b225ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,14 +1,18 @@ # kafka-node CHANGELOG +## 2015-04-01, Version 0.2.25 +- Producer support `requireAcks` option [#187](https://github.com/SOHU-Co/kafka-node/pull/187) +- Update examples [#185](https://github.com/SOHU-Co/kafka-node/pull/185) + ## 2015-03-20, Version 0.2.24 - Bump deps -- Refresh metadata after auto rebalance among brokers #180 -- Initialize partition owner with consumerId #178 +- Refresh metadata after auto rebalance among brokers [#180](https://github.com/SOHU-Co/kafka-node/pull/180) +- Initialize partition owner with consumerId [#178](https://github.com/SOHU-Co/kafka-node/pull/178) ## 2015-03-17, Version 0.2.23 -- Fix #175: Refresh topic metadata in Producer when broker change +- Fix [#175](https://github.com/SOHU-Co/kafka-node/issues/175): Refresh topic metadata in Producer when broker change - Refactor Client#refreshMetadata method - Add the missing semicolons, no offense, just keep style. -- Fix #170: In case of `offsetOutOfRange`, the consumer should be paused. -- Fix #169: When paused why try to fetch every 1000 ms? +- Fix [#170](https://github.com/SOHU-Co/kafka-node/issues/170): In case of `offsetOutOfRange`, the consumer should be paused. +- Fix [#169](https://github.com/SOHU-Co/kafka-node/issues/169): When paused why try to fetch every 1000 ms? - Ref: remove unused variables. diff --git a/README.md b/README.md index 68141197..cb4f9d1a 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,9 @@ Closes the connection to Zookeeper and the brokers so that the node process can * `cb`: **Function**, the callback ## Producer -### Producer(client) +### Producer(client, [options]) * `client`: client which keeps a connection with the Kafka server. +* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}` ``` js var kafka = require('kafka-node'), @@ -104,8 +105,9 @@ producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg ## HighLevelProducer -### HighLevelProducer(client) +### HighLevelProducer(client, [options]) * `client`: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions +* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}` ``` js var kafka = require('kafka-node'), diff --git a/example/producer.js b/example/producer.js index 7bbbe2e4..e7b8b74b 100644 --- a/example/producer.js +++ b/example/producer.js @@ -7,7 +7,7 @@ var argv = require('optimist').argv; var topic = argv.topic || 'topic1'; var p = argv.p || 0; var a = argv.a || 0; -var producer = new Producer(client); +var producer = new Producer(client, { requireAcks: 1 }); producer.on('ready', function () { var message = 'a message'; diff --git a/lib/client.js b/lib/client.js index 51c9b51b..82513126 100644 --- a/lib/client.js +++ b/lib/client.js @@ -133,9 +133,11 @@ Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs }; Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) { - var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs), - decoder = protocol.decodeProduceResponse, - self = this; + var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs); + var decoder = protocol.decodeProduceResponse; + var self = this; + + decoder.requireAcks = requireAcks; async.each(payloads, buildRequest, function (err) { if (err) return cb(err); @@ -221,7 +223,7 @@ Client.prototype.loadMetadataForTopics = function (topics, cb) { } this.queueCallback(broker, correlationId, [protocol.decodeMetadataResponse, cb]); - broker && broker.write(request); + broker.write(request); }; Client.prototype.createTopics = function (topics, isAsync, cb) { @@ -415,8 +417,14 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) { if (broker.waitting) continue; broker.waitting = true; } - this.queueCallback(broker, correlationId, [decoder, cb]); - broker && broker.write(request); + + if (decoder.requireAcks == 0) { + broker.write(request); + cb(null, { result: 'no ack' }); + } else { + this.queueCallback(broker, correlationId, [decoder, cb]); + broker.write(request); + } } }; diff --git a/lib/highLevelProducer.js b/lib/highLevelProducer.js index 8b7f38df..792c06bb 100644 --- a/lib/highLevelProducer.js +++ b/lib/highLevelProducer.js @@ -29,13 +29,17 @@ var util = require('util'), * @constructor */ var HighLevelProducer = function (client, options) { - var useOptions = options || {}; + options = options || {}; this.ready = false; this.client = client; - this.requireAcks = useOptions.requireAcks || DEFAULTS.requireAcks - this.ackTimeoutMs = useOptions.ackTimeoutMs || DEFAULTS.ackTimeoutMs + this.requireAcks = options.requireAcks === undefined + ? DEFAULTS.requireAcks + : options.requireAcks; + this.ackTimeoutMs = options.ackTimeoutMs === undefined + ? DEFAULTS.ackTimeoutMs + : options.ackTimeoutMs; this.connect(); }; diff --git a/lib/producer.js b/lib/producer.js index 468fa6bd..ce692d5a 100644 --- a/lib/producer.js +++ b/lib/producer.js @@ -30,13 +30,17 @@ var util = require('util'), * @constructor */ var Producer = function (client, options) { - var useOptions = options || {}; + options = options || {}; this.ready = false; this.client = client; - this.requireAcks = useOptions.requireAcks || DEFAULTS.requireAcks - this.ackTimeoutMs = useOptions.ackTimeoutMs || DEFAULTS.ackTimeoutMs + this.requireAcks = options.requireAcks === undefined + ? DEFAULTS.requireAcks + : options.requireAcks; + this.ackTimeoutMs = options.ackTimeoutMs === undefined + ? DEFAULTS.ackTimeoutMs + : options.ackTimeoutMs; this.connect(); }; diff --git a/package.json b/package.json index 5d359668..4815686a 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "kafka-node", "description": "node client for Apache kafka, only support kafka 0.8 and above", - "version": "0.2.24", + "version": "0.2.25", "main": "kafka.js", "dependencies": { "async": "~0.9.0", diff --git a/test/test.producer.js b/test/test.producer.js index 6af141a7..9db6fa7d 100644 --- a/test/test.producer.js +++ b/test/test.producer.js @@ -5,7 +5,7 @@ var kafka = require('..'), Client = kafka.Client, KeyedMessage = kafka.KeyedMessage; -var client, producer; +var client, producer, noAckProducer; var TOPIC_POSTFIX = '_test_' + Date.now(); var EXISTS_TOPIC_3 = '_exists_3' + TOPIC_POSTFIX; @@ -26,6 +26,7 @@ before(function (done) { setTimeout(done, 500); }); }); + noAckProducer = new Producer(client, { requireAcks: 0 }); }); describe('Producer', function () { @@ -116,6 +117,16 @@ describe('Producer', function () { done(); }); }); + + it('should send message without ack', function (done) { + noAckProducer.send([{ + topic: EXISTS_TOPIC_3, messages: 'hello kafka' + }], function (err, message) { + if (err) return done(err); + message.result.should.equal('no ack'); + done(); + }); + }) }); describe('#createTopics', function () {