Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

auto commit and add event for rebalance #889

Merged
merged 5 commits into from
Mar 15, 2018

Conversation

aikar
Copy link
Contributor

@aikar aikar commented Mar 7, 2018

Should resolve #787

If auto commit is true, commit before we rejoin the group on a rebalance
According to the kafka protcol, commits are allowed during this stage, as the generation ID has not been bumped yet.

If auto commit is disabled, implementors need a way to decide what to commit, so an onRebalance option was added.

Implementors can perform commits during this method and call back when done to finish the rebalance.

Should resolve SOHU-Co#787

If auto commit is true, commit before we rejoin the group on a rebalance
According to the kafka protcol, commits are allowed during this stage, as the generation ID has not been bumped yet.

If auto commit is disabled, implementors need a way to decide what to commit, so an onRebalance option was added.

Implementors can perform commits during this method and call back when done to finish the rebalance.
@aikar aikar closed this Mar 7, 2018
@aikar aikar reopened this Mar 7, 2018
This required special behavior due to the separate commit queue
@aikar
Copy link
Contributor Author

aikar commented Mar 7, 2018

Depends on #891 to be pulled first, as this work helped surface the off by 1 error fixed in #891

@aikar
Copy link
Contributor Author

aikar commented Mar 7, 2018

This should be good to go now (Travis has been pretty unreliable lately, test did pass)

An example of using the onRebalance event:

    async _onRebalance(isMember: boolean, cb: Function) {
        try {
            if (isMember) {
                this.logger.info("Rebalancing");
                // Clear and wait for any pending event dispatches, giving them a chance to commit
                await this._clearAndWaitForQueue();
                // Now any pending tasks are in the commit queue, which will be picked up after this method finishes
            }
            this.emit("kafka:rebalance");
        } catch (e) {
            this.logger.error("ErrorKafkaRebalance", e);
        } finally {
            cb();
        }
    }

@aikar
Copy link
Contributor Author

aikar commented Mar 12, 2018

@hyperlink is anything holding this up?

const originalOnRebalance = options.onRebalance;
options.onRebalance = function (isAlreadyMember, callback) {
let autoCommitCalled = false;
const autoCommit = function (err) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could warp this in _.once to avoid having a flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

@hyperlink hyperlink left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution!

@hyperlink hyperlink merged commit bd9d566 into SOHU-Co:master Mar 15, 2018
@hugebdu
Copy link

hugebdu commented Apr 8, 2018

@hyperlink dear Sir, any plans to release this?
10x

@hyperlink
Copy link
Collaborator

@hugebdu I will get it prepared.

@hyperlink
Copy link
Collaborator

@hugebdu holding off for now. I'm tracking down an error when running the tests locally.

@hugebdu
Copy link

hugebdu commented Apr 9, 2018

@hyperlink 10x for update. waiting patiently

@hyperlink
Copy link
Collaborator

@hugebdu I found the issue #918

@hyperlink
Copy link
Collaborator

Published as 2.5.0.

@hugebdu
Copy link

hugebdu commented Apr 9, 2018

@hyperlink thanks a lot!

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ConsumerGroup: whenever new consumer is added to group, first few messages consumed twice.
3 participants