-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
KAFKA-3949: Avoid race condition when subscription changes during rebalance #1364
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, this is a lot more work than I remembered when I investigated how Java fixed this... is that because you're also porting in other updates from the Java client? Or is my memory just faulty?
kafka/coordinator/consumer.py
Outdated
@@ -119,6 +121,7 @@ def __init__(self, client, subscription, metrics, **configs): | |||
self.consumer_sensors = ConsumerCoordinatorMetrics( | |||
metrics, self.config['metric_group_prefix'], self._subscription) | |||
|
|||
#self._handle_metadata_update(self._cluster) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this if it's commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will delete -- thanks
# work that out right now. If you read this at some later date after the mutable | ||
# state has bitten you... I'm sorry! It mimics the java client, and that's the | ||
# best I've got for now. | ||
self._joined_subscription = set(self._subscription.subscription) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😃 loved this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I know. Hate it, as the method is basically a getter. They put it there, just cause it should be in ConsumerCoordinator
but not in AbstractCoordinator
and only this method was overridable... Basically a big hack. BTW the method is called metadata()
nowdays. https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L148
This is just 3949. The upstream PR is here: apache/kafka#1762 |
9ae71f0
to
6ca8439
Compare
Fix #1241