Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix missing support in ConsumerGroup for fromOffset using earliest and none #483

Merged
merged 1 commit into from
Oct 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,11 @@ var options = {
// An array of partition assignment protocols ordered by preference.
// 'roundrobin' or 'range' string for built ins (see below to pass in custom assignment protocol)
protocol: ['roundrobin'],
fromOffset: 'latest', // for new groups read messages from the latest offsets (defaults to the earliest available offset)

// Offsets to use for new groups other options could be 'earliest' or 'none' (none will emit an error if no offsets were saved)
// equivalent to Java client's auto.offset.reset
fromOffset: 'latest', // default

migrateHLC: false, // for details please see Migration section below
migrateRolling: true
};
Expand Down
4 changes: 2 additions & 2 deletions example/consumerGroupMember.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ var consumerOptions = {
host: '127.0.0.1:2181',
groupId: 'ExampleTestGroup',
sessionTimeout: 15000,
protocol: ['range'],
fromOffset: 'latest'
protocol: ['roundrobin'],
fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
};

var topics = ['RebalanceTopic', 'RebalanceTest'];
Expand Down
43 changes: 30 additions & 13 deletions lib/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ const createTopicPartitionList = require('./utils').createTopicPartitionList;

const assert = require('assert');
const builtInProtocols = require('./assignment');
var LATEST_OFFSET = -1;

const LATEST_OFFSET = -1;
const EARLIEST_OFFSET = -2;
const ACCEPTED_FROM_OFFSET = {
latest: LATEST_OFFSET,
earliest: EARLIEST_OFFSET,
none: false
};

const DEFAULTS = {
groupId: 'kafka-node-group',
// Auto commit config
Expand All @@ -26,7 +34,7 @@ const DEFAULTS = {
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 1024,
maxTickMessages: 1000,
fromOffset: false,
fromOffset: 'latest',
sessionTimeout: 30000,
retries: 10,
retryFactor: 1.8,
Expand All @@ -45,6 +53,10 @@ function ConsumerGroup (memberOptions, topics) {
memberOptions.ssl = {};
}

if (!(this.options.fromOffset in ACCEPTED_FROM_OFFSET)) {
throw new Error(`fromOffset ${this.options.fromOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}`);
}

this.client = new Client(memberOptions.host, memberOptions.id, memberOptions.zk,
memberOptions.batch, memberOptions.ssl);

Expand Down Expand Up @@ -177,18 +189,18 @@ ConsumerGroup.prototype.handleJoinGroup = function (joinGroupResponse, callback)
callback(null, groupAssignment);
};

ConsumerGroup.prototype.saveLatestOffsets = function (topicPartitionList, callback) {
ConsumerGroup.prototype.saveDefaultOffsets = function (topicPartitionList, callback) {
var self = this;
const offsetPayload = _(topicPartitionList).cloneDeep().map(function (tp) {
tp.time = LATEST_OFFSET;
const offsetPayload = _(topicPartitionList).cloneDeep().map(tp => {
tp.time = ACCEPTED_FROM_OFFSET[this.options.fromOffset];
return tp;
});

self.getOffset().fetch(offsetPayload, function (error, result) {
if (error) {
return callback(error);
}
self.latestOffsets = _.mapValues(result, function (partitionOffsets) {
self.defaultOffsets = _.mapValues(result, function (partitionOffsets) {
return _.mapValues(partitionOffsets, _.first);
});
callback(null);
Expand All @@ -203,7 +215,7 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)
debug('%s owns topics: ', self.client.clientId, syncGroupResponse.partitions);

const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions);
const useLatestOffsets = self.options.fromOffset === 'latest';
const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET;

async.waterfall([
function (callback) {
Expand All @@ -218,6 +230,11 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)

if (noOffset) {
debug('No saved offsets');

if (self.options.fromOffset === 'none') {
return callback(new Error(`${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${self.options.groupId}'`));
}

async.parallel([
function (callback) {
if (self.migrator) {
Expand All @@ -226,16 +243,16 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)
callback(null);
},
function (callback) {
if (useLatestOffsets) {
return self.saveLatestOffsets(topicPartitionList, callback);
if (useDefaultOffsets) {
return self.saveDefaultOffsets(topicPartitionList, callback);
}
callback(null);
}
], function (error) {
if (error) {
return callback(error);
}
debug('%s latestOffset Response: %j', self.client.clientId, self.latestOffsets);
debug('%s defaultOffset Response for %s: %j', self.client.clientId, self.options.fromOffset, self.defaultOffsets);
callback(null, offsets);
});
} else {
Expand All @@ -247,7 +264,7 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)
self.topicPayloads = self.buildPayloads(topicPartitionList).map(function (p) {
var offset = offsets[p.topic][p.partition];
if (offset === -1) { // -1 means no offset was saved for this topic/partition combo
offset = useLatestOffsets ? self.getLatestOffset(p, 0) : 0;
offset = useDefaultOffsets ? self.getDefaultOffset(p, 0) : 0;
if (self.migrator) {
offset = self.migrator.getOffset(p, offset);
}
Expand All @@ -263,8 +280,8 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)
}
};

ConsumerGroup.prototype.getLatestOffset = function (tp, defaultOffset) {
return _.get(this.latestOffsets, [tp.topic, tp.partition], defaultOffset);
ConsumerGroup.prototype.getDefaultOffset = function (tp, defaultOffset) {
return _.get(this.defaultOffsets, [tp.topic, tp.partition], defaultOffset);
};

ConsumerGroup.prototype.getOffset = function () {
Expand Down
75 changes: 63 additions & 12 deletions test/test.consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,29 @@ describe('ConsumerGroup', function () {
}, 'SampleTopic');
sinon.assert.calledWithExactly(fakeClient, 'myhost', 'myClientId', undefined, undefined, ssl);
});

it('should throw an error if using an invalid fromOffset', function () {
[true, false, '', 0, 1, 'blah'].forEach(offset => {
should.throws(() => {
// eslint-disable-next-line no-new
new ConsumerGroup({
fromOffset: offset
});
});
});
});

it('should not throw an error if using an valid fromOffset', function () {
['earliest', 'latest', 'none'].forEach(offset => {
should.doesNotThrow(() => {
// eslint-disable-next-line no-new
new ConsumerGroup({
fromOffset: offset,
connectOnReady: false
}, 'TestTopic');
});
});
});
});

describe('#sendHeartbeats', function () {
Expand Down Expand Up @@ -165,13 +188,13 @@ describe('ConsumerGroup', function () {
};

sandbox.stub(consumerGroup, 'fetchOffset').yields(null, fetchOffsetResponse);
sandbox.stub(consumerGroup, 'saveLatestOffsets').yields(null);
sandbox.stub(consumerGroup, 'saveDefaultOffsets').yields(null);

consumerGroup.handleSyncGroup(syncGroupResponse, function (error, ownsPartitions) {
ownsPartitions.should.be.true;
sinon.assert.calledWith(consumerGroup.fetchOffset, syncGroupResponse.partitions);

sinon.assert.notCalled(consumerGroup.saveLatestOffsets);
sinon.assert.notCalled(consumerGroup.saveDefaultOffsets);
sinon.assert.notCalled(consumerGroup.migrator.saveHighLevelConsumerOffsets);

const topicPayloads = _(consumerGroup.topicPayloads);
Expand All @@ -197,7 +220,7 @@ describe('ConsumerGroup', function () {
}
};

const latestOffsets = {
const defaultOffsets = {
TestTopic: {
0: 10,
2: 20,
Expand All @@ -222,16 +245,16 @@ describe('ConsumerGroup', function () {
}
};

consumerGroup.latestOffsets = latestOffsets;
consumerGroup.defaultOffsets = defaultOffsets;
consumerGroup.migrator.offsets = migrateOffsets;
sandbox.stub(consumerGroup, 'fetchOffset').yields(null, fetchOffsetResponse);
sandbox.stub(consumerGroup, 'saveLatestOffsets').yields(null);
sandbox.stub(consumerGroup, 'saveDefaultOffsets').yields(null);

consumerGroup.handleSyncGroup(syncGroupResponse, function (error, ownsPartitions) {
ownsPartitions.should.be.true;
sinon.assert.calledWith(consumerGroup.fetchOffset, syncGroupResponse.partitions);

sinon.assert.calledOnce(consumerGroup.saveLatestOffsets);
sinon.assert.calledOnce(consumerGroup.saveDefaultOffsets);
sinon.assert.calledOnce(consumerGroup.migrator.saveHighLevelConsumerOffsets);

const topicPayloads = _(consumerGroup.topicPayloads);
Expand All @@ -245,6 +268,34 @@ describe('ConsumerGroup', function () {
});
});

describe('options.fromOffset is "none"', function () {
it('should yield error when there is not saved offsets', function (done) {
consumerGroup.options.fromOffset = 'none';
const syncGroupResponse = {
partitions: {
TestTopic: [0, 2, 3, 4]
}
};

const fetchOffsetResponse = {
TestTopic: {
0: 10,
2: -1,
3: -1,
4: -1
}
};

sandbox.stub(consumerGroup, 'fetchOffset').yields(null, fetchOffsetResponse);
sandbox.stub(consumerGroup, 'saveDefaultOffsets').yields(null);
consumerGroup.handleSyncGroup(syncGroupResponse, function (error, ownsPartitions) {
should(ownsPartitions).be.undefined;
error.should.be.a.Error;
done();
});
});
});

describe('options.fromOffset is "latest"', function () {
it('should not fetch latestOffset if all offsets have saved previously', function (done) {
consumerGroup.options.fromOffset = 'latest';
Expand All @@ -265,12 +316,12 @@ describe('ConsumerGroup', function () {
};

sandbox.stub(consumerGroup, 'fetchOffset').yields(null, fetchOffsetResponse);
sandbox.stub(consumerGroup, 'saveLatestOffsets').yields(null);
sandbox.stub(consumerGroup, 'saveDefaultOffsets').yields(null);

consumerGroup.handleSyncGroup(syncGroupResponse, function (error, ownsPartitions) {
ownsPartitions.should.be.true;
sinon.assert.calledWith(consumerGroup.fetchOffset, syncGroupResponse.partitions);
sinon.assert.notCalled(consumerGroup.saveLatestOffsets);
sinon.assert.notCalled(consumerGroup.saveDefaultOffsets);

const topicPayloads = _(consumerGroup.topicPayloads);

Expand Down Expand Up @@ -300,22 +351,22 @@ describe('ConsumerGroup', function () {
}
};

const latestOffsets = {
const defaultOffsets = {
TestTopic: {
0: 10,
2: 3,
4: 5000
}
};

consumerGroup.latestOffsets = latestOffsets;
consumerGroup.defaultOffsets = defaultOffsets;
sandbox.stub(consumerGroup, 'fetchOffset').yields(null, fetchOffsetResponse);
sandbox.stub(consumerGroup, 'saveLatestOffsets').yields(null);
sandbox.stub(consumerGroup, 'saveDefaultOffsets').yields(null);

consumerGroup.handleSyncGroup(syncGroupResponse, function (error, ownsPartitions) {
ownsPartitions.should.be.true;
sinon.assert.calledWith(consumerGroup.fetchOffset, syncGroupResponse.partitions);
sinon.assert.calledOnce(consumerGroup.saveLatestOffsets);
sinon.assert.calledOnce(consumerGroup.saveDefaultOffsets);

const topicPayloads = _(consumerGroup.topicPayloads);

Expand Down