Skip to content

Commit

Permalink
Initialize metadata_snapshot in group coordinator (#1174)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored Aug 13, 2017
1 parent 422189b commit 4b32b2e
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(self, client, subscription, metrics, **configs):
assert self.config['assignors'], 'Coordinator requires assignors'

self._subscription = subscription
self._metadata_snapshot = {}
self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
self._assignment_snapshot = None
self._cluster = client.cluster
self._cluster.request_update()
Expand Down Expand Up @@ -162,15 +162,18 @@ def _handle_metadata_update(self, cluster):
for partition in self._metadata_snapshot[topic]
])

def _subscription_metadata_changed(self, cluster):
if not self._subscription.partitions_auto_assigned():
return False

def _build_metadata_snapshot(self, subscription, cluster):
metadata_snapshot = {}
for topic in self._subscription.group_subscription():
for topic in subscription.group_subscription():
partitions = cluster.partitions_for_topic(topic) or []
metadata_snapshot[topic] = set(partitions)
return metadata_snapshot

def _subscription_metadata_changed(self, cluster):
if not self._subscription.partitions_auto_assigned():
return False

metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
if self._metadata_snapshot != metadata_snapshot:
self._metadata_snapshot = metadata_snapshot
return True
Expand Down

0 comments on commit 4b32b2e

Please # to comment.