Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Can't add a consumer to an existing group with a StickyPartitionAssignor #2153

Closed
joein opened this issue Oct 30, 2020 · 3 comments
Closed

Comments

@joein
Copy link

joein commented Oct 30, 2020

It is not possible to add a new consumer to an existing group with a StickyPartitionAssignor due to the lack of __len__ method in dict_itemiterator.

Kafka version 1.0.1 (https://archive.apache.org/dist/kafka/1.0.1/kafka_2.11-1.0.1.tgz)
Python 3.8
kafka-python 2.0.2

Suppose we have first_consumer.py:

from kafka import KafkaConsumer
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignoraddr = "127.0.0.1:9092"
group_id = "some_group_id"
topic = ["first_topic"]
consumer = KafkaConsumer(
*topic,
 
                         partition_assignment_strategy=(StickyPartitionAssignor,), 
                         bootstrap_servers=addr,
  
                         group_id=group_id)
while True:
    consumer.poll(timeout_ms=4000, max_records=1)
    print(consumer.assignment())

And second_consumer.py

from kafka import KafkaConsumer
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignoraddr = "127.0.0.1:9092"
group_id = "some_group_id"
topic = ["second_topic"]
consumer = KafkaConsumer(
*topic,
 
                         partition_assignment_strategy=(StickyPartitionAssignor,), 
                         bootstrap_servers=addr,
  
                         group_id=group_id)
while True:
    consumer.poll(timeout_ms=4000, max_records=1)
    print(consumer.assignment())

Then we launch our script first_consumer.py and wait him to get his assignment and then launch the second script second_consumer.py.
This sequence leads to join_group_request during the second consumer connection, which in turn calls assignor.metadata(). Eventually, in assignor.metadata we are trying to encode user_data:

return b''.join(
           [Int32.encode(len(items))] +
           [self.array_of.encode(item) for item in items]
       )

But items is type of dict_iteritem (which is six.iteritems) and has no __len__ defined. And this is the reason for the following exception:

File "first_consumer.py", line 12, in consume
    records = consumer.poll(timeout_ms=4000, max_records=1)
  File "/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 675, in _poll_once
    self._coordinator.poll()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 289, in poll
    self.ensure_active_group()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/base.py", line 390, in ensure_active_group
    future = self._send_join_group_request()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/base.py", line 453, in _send_join_group_request
    for protocol, metadata in self.group_protocols()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 154, in group_protocols
    metadata = assignor.metadata(self._joined_subscription)
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/assignors/sticky/sticky_assignor.py", line 660, in metadata
    user_data = data.encode()
  File "/venv/lib/python3.8/site-packages/kafka/util.py", line 50, in __call__
    return self.method()(self.target(), *args, **kwargs)
  File "/venv/lib/python3.8/site-packages/kafka/protocol/struct.py", line 42, in _encode_self
    return self.SCHEMA.encode(
  File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 146, in encode
    return b''.join([
  File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 147, in <listcomp>
    field.encode(item[i])
  File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 185, in encode
    [Int32.encode(len(items))] +
TypeError: object of type 'dict_itemiterator' has no len()

Maybe @aynroot can help here.

@jeffwidman
Copy link
Contributor

Wow, fantastic bug report! thank you so much

@jeffwidman
Copy link
Contributor

@joein does #2154 fix it for you?

@joein
Copy link
Author

joein commented Nov 2, 2020

Yeah, it doesn't crush anymore. Feel free to close this issue :)

Thanks @aynroot @jeffwidman

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants