Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Producer support requireAcks option #187

Merged
merged 2 commits into from
Apr 1, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# kafka-node CHANGELOG

## 2015-04-01, Version 0.2.25
- Producer support `requireAcks` option [#187](https://github.com/SOHU-Co/kafka-node/pull/187)
- Update examples [#185](https://github.com/SOHU-Co/kafka-node/pull/185)

## 2015-03-20, Version 0.2.24
- Bump deps
- Refresh metadata after auto rebalance among brokers #180
- Initialize partition owner with consumerId #178
- Refresh metadata after auto rebalance among brokers [#180](https://github.com/SOHU-Co/kafka-node/pull/180)
- Initialize partition owner with consumerId [#178](https://github.com/SOHU-Co/kafka-node/pull/178)

## 2015-03-17, Version 0.2.23
- Fix #175: Refresh topic metadata in Producer when broker change
- Fix [#175](https://github.com/SOHU-Co/kafka-node/issues/175): Refresh topic metadata in Producer when broker change
- Refactor Client#refreshMetadata method
- Add the missing semicolons, no offense, just keep style.
- Fix #170: In case of `offsetOutOfRange`, the consumer should be paused.
- Fix #169: When paused why try to fetch every 1000 ms?
- Fix [#170](https://github.com/SOHU-Co/kafka-node/issues/170): In case of `offsetOutOfRange`, the consumer should be paused.
- Fix [#169](https://github.com/SOHU-Co/kafka-node/issues/169): When paused why try to fetch every 1000 ms?
- Ref: remove unused variables.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ Closes the connection to Zookeeper and the brokers so that the node process can
* `cb`: **Function**, the callback

## Producer
### Producer(client)
### Producer(client, [options])
* `client`: client which keeps a connection with the Kafka server.
* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}`

``` js
var kafka = require('kafka-node'),
Expand Down Expand Up @@ -104,8 +105,9 @@ producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg


## HighLevelProducer
### HighLevelProducer(client)
### HighLevelProducer(client, [options])
* `client`: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions
* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}`

``` js
var kafka = require('kafka-node'),
Expand Down
2 changes: 1 addition & 1 deletion example/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ var argv = require('optimist').argv;
var topic = argv.topic || 'topic1';
var p = argv.p || 0;
var a = argv.a || 0;
var producer = new Producer(client);
var producer = new Producer(client, { requireAcks: 1 });

producer.on('ready', function () {
var message = 'a message';
Expand Down
20 changes: 14 additions & 6 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,11 @@ Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs
};

Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) {
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs),
decoder = protocol.decodeProduceResponse,
self = this;
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs);
var decoder = protocol.decodeProduceResponse;
var self = this;

decoder.requireAcks = requireAcks;

async.each(payloads, buildRequest, function (err) {
if (err) return cb(err);
Expand Down Expand Up @@ -221,7 +223,7 @@ Client.prototype.loadMetadataForTopics = function (topics, cb) {
}

this.queueCallback(broker, correlationId, [protocol.decodeMetadataResponse, cb]);
broker && broker.write(request);
broker.write(request);
};

Client.prototype.createTopics = function (topics, isAsync, cb) {
Expand Down Expand Up @@ -415,8 +417,14 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
if (broker.waitting) continue;
broker.waitting = true;
}
this.queueCallback(broker, correlationId, [decoder, cb]);
broker && broker.write(request);

if (decoder.requireAcks == 0) {
broker.write(request);
cb(null, { result: 'no ack' });
} else {
this.queueCallback(broker, correlationId, [decoder, cb]);
broker.write(request);
}
}
};

Expand Down
10 changes: 7 additions & 3 deletions lib/highLevelProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ var util = require('util'),
* @constructor
*/
var HighLevelProducer = function (client, options) {
var useOptions = options || {};
options = options || {};

this.ready = false;
this.client = client;

this.requireAcks = useOptions.requireAcks || DEFAULTS.requireAcks
this.ackTimeoutMs = useOptions.ackTimeoutMs || DEFAULTS.ackTimeoutMs
this.requireAcks = options.requireAcks === undefined
? DEFAULTS.requireAcks
: options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;

this.connect();
};
Expand Down
10 changes: 7 additions & 3 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ var util = require('util'),
* @constructor
*/
var Producer = function (client, options) {
var useOptions = options || {};
options = options || {};

this.ready = false;
this.client = client;

this.requireAcks = useOptions.requireAcks || DEFAULTS.requireAcks
this.ackTimeoutMs = useOptions.ackTimeoutMs || DEFAULTS.ackTimeoutMs
this.requireAcks = options.requireAcks === undefined
? DEFAULTS.requireAcks
: options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;

this.connect();
};
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "kafka-node",
"description": "node client for Apache kafka, only support kafka 0.8 and above",
"version": "0.2.24",
"version": "0.2.25",
"main": "kafka.js",
"dependencies": {
"async": "~0.9.0",
Expand Down
13 changes: 12 additions & 1 deletion test/test.producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var kafka = require('..'),
Client = kafka.Client,
KeyedMessage = kafka.KeyedMessage;

var client, producer;
var client, producer, noAckProducer;

var TOPIC_POSTFIX = '_test_' + Date.now();
var EXISTS_TOPIC_3 = '_exists_3' + TOPIC_POSTFIX;
Expand All @@ -26,6 +26,7 @@ before(function (done) {
setTimeout(done, 500);
});
});
noAckProducer = new Producer(client, { requireAcks: 0 });
});

describe('Producer', function () {
Expand Down Expand Up @@ -116,6 +117,16 @@ describe('Producer', function () {
done();
});
});

it('should send message without ack', function (done) {
noAckProducer.send([{
topic: EXISTS_TOPIC_3, messages: 'hello kafka'
}], function (err, message) {
if (err) return done(err);
message.result.should.equal('no ack');
done();
});
})
});

describe('#createTopics', function () {
Expand Down