Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

AttributeError: 'PartitionRecords' object has no attribute 'message_idx' #1290

Closed
kerfab opened this issue Nov 9, 2017 · 15 comments
Closed
Assignees
Labels

Comments

@kerfab
Copy link

kerfab commented Nov 9, 2017

kafka-python is 1.3.5

My code is basically trying to do:

pollResults = self._consumer.poll(timeout, maxResults)

and I run into the following exception from within the library:

  File "/usr/lib/python3.6/site-packages/kafka/consumer/group.py", line 570, in poll
    records = self._poll_once(remaining, max_records)
  File "/usr/lib/python3.6/site-packages/kafka/consumer/group.py", line 604, in _poll_once
    records, partial = self._fetcher.fetched_records(max_records)
  File "/usr/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 336, in fetched_records
    if not self._next_partition_records:
  File "/usr/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 855, in __len__
    if self.messages is None or self.message_idx >= len(self.messages):
AttributeError: 'PartitionRecords' object has no attribute 'message_idx'
@dpkp dpkp added the bug label Nov 9, 2017
@dpkp dpkp self-assigned this Nov 9, 2017
@dpkp
Copy link
Owner

dpkp commented Nov 9, 2017

Are you seeing any errors in the logs related to unpacking message sets?

@kerfab
Copy link
Author

kerfab commented Nov 9, 2017

No, this is issued on a simple poll(), and previous versions of the library that I used never had this issue.

I inspected the code, and the __init__() function does not set message_idx to any default value. It does set it, in a loop, but not prior it:

    class PartitionRecords(object):
        def __init__(self, fetch_offset, tp, messages):
            self.fetch_offset = fetch_offset
            self.topic_partition = tp
            self.messages = messages
            for i, msg in enumerate(messages):
                if msg.offset == fetch_offset:
                    self.message_idx = i  # <------------ Not inited before !!

The error might occur because messages is empty or msg.offset is always different from fetch_offset (not sure) so message_idx is never set when later accessed by __len__()

I suspect a race condition or similar because the error occurs nearly "randomly" and was discovered on a service starting 40 consumers (individual processes, not threads) at once. Different processes crash due to this error when the entire pool is restarted, so it's not related to a specific topic (the 40 processes all consume different topics).

I think the error can be avoided by simply initializing the variable in the class, but I am unsure of what the default value could be otherwise I would gladly set up a PR.

@dpkp
Copy link
Owner

dpkp commented Nov 9, 2017

Yes, you are correct. My question is whether you see any other error log messages related to message unpacking? The root cause is a non-empty messageset yielding an empty list of actual messages. This is strange and unexpected behavior and suggests either that there is a lower level protocol issue, that message compression is failing, or that the message unpacking itself has a bug. The fix for the attribute error is obvious, but I'm much more interested in determining if there is a root cause.

@kerfab
Copy link
Author

kerfab commented Nov 9, 2017

I don't have access to the logs right now, but I do not think I have more to report : anything in the stack trace above what I copy pasted (and which is not shown in the current bug report) are my function calls only. To be sure I will review the logs tomorrow, and will update this ticket accordingly.

@kerfab
Copy link
Author

kerfab commented Nov 10, 2017

Just a statuts update, I was not available to gather the logs today, so I will do it on Monday.

@kerfab
Copy link
Author

kerfab commented Nov 13, 2017

Logs before crash:

[client_async.py@_bootstrap:233] Bootstrapping cluster metadata from [('kafka-1', 9092, <AddressFamily.AF_UNSPEC: 0>)]
[conn.py@connect:322] <BrokerConnection node_id=bootstrap host=kafka-1/10.***.***.*** port=9092>: connecting to 10.***.***.***:9092
[client_async.py@_bootstrap:273] Bootstrap succeeded: found 1 brokers and 335 topics.
[conn.py@close:656] <BrokerConnection node_id=bootstrap host=kafka-1/10.***.***.*** port=9092>: Closing connection. 
[conn.py@connect:322] <BrokerConnection node_id=1 host=*****/10.***.***.*** port=9093>: connecting to 10.***.***.***:9093
[conn.py@check_version:933] Broker version identifed as 0.11.0
[conn.py@check_version:935] Set configuration api_version=(0, 11, 0) to skip auto check_version requests on startup
[subscription_state.py@change_subscription:172] Updating subscribed topics to: ['****', '***', (a topic list of 9 topics)]
[cluster.py@add_group_coordinator:342] Group coordinator for ******* is BrokerMetadata(nodeId=1, host='****', port=9093, rack=None)
[base.py@_handle_group_coordinator_response:536] Discovered coordinator 1 for *****
[consumer.py@_on_join_prepare:272] Revoking previously assigned partitions set() for group *****
[base.py@_send_join_group_request:317] (Re-)joining group ******
[base.py@_handle_join_group_response:356] Joined group '*****' (generation 21626) with member_id kafka-python-1.3.6.dev-55e2f1a9-05be-4dfb-9e15-0c1d79b0fca4
[base.py@_handle_join_group_response:360] Elected group leader -- performing partition assignments using range
[base.py@_handle_sync_group_response:464] Successfully joined ***** with generation 21626
[subscription_state.py@assign_from_subscribed:258] Updated partition assignment: [TopicPartition(topic='****', partition=0), TopicPartition(topic='***', partition=0), etc. list for 9 topics/partitions]
[consumer.py@_on_join_complete:218] Setting newly assigned partitions {TopicPartition(topic='****', partition=0), TopicPartition(topic='****', partition=0), etc. list for 9 topics partitions } for group ******

There is no specific issue related to message unpacking, in fact everything seems as fine as usual.

After those logs, my code calls poll() and uses len() to see the amount of results retrieved from poll() (not expecting more than 1 message, I poll() for 1 message only).

And it crashes.

@rmechler
Copy link

I have the same problem. Very simple consumer, worked fine with 1.3.4, but poll() fails every time with 1.3.5 and current master. Brokers are running Kafka 0.11. Here's my client code, with host names changed. poll() hangs a number of seconds then fails (or times out, and then fails on the next try if I have a shorter timeout).

#!/usr/bin/python -u

from kafka import KafkaConsumer

topic = 'test_topic'
group_id = 'crash_test'
bootstrap_servers = 'node1.my_kafka.com:9093,node2.my_kafka.com:9093,node3.my_kafka.com:9093'

consumer = KafkaConsumer(
    topic,
    bootstrap_servers=bootstrap_servers,
    group_id=group_id,
    security_protocol='SSL',
    ssl_cafile='/etc/ssl/kafka-ca.crt',
    ssl_certfile='/etc/ssl/kafka-client.crt',
    ssl_keyfile='/etc/ssl/kafka-client.key',
    ssl_check_hostname=True,
    auto_offset_reset='earliest'
)

try:
    print('consuming')

    while True:

        print('calling poll')
        results = consumer.poll(timeout_ms=30000)
        print('returned from poll')
        if results:
            for records in results.values():
                for msg in records:
                    print(msg.key, msg.value)
        else:
            print("no results")

except KeyboardInterrupt:
    pass

consumer.close()

The output is:

# ./crash_test
consuming
calling poll
Traceback (most recent call last):
  File "./crash_test", line 27, in <module>
    results = consumer.poll(timeout_ms=30000)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 570, in poll
    records = self._poll_once(remaining, max_records)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 618, in _poll_once
    records, _ = self._fetcher.fetched_records(max_records)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 336, in fetched_records
    if not self._next_partition_records:
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 855, in __len__
    if self.messages is None or self.message_idx >= len(self.messages):
AttributeError: 'PartitionRecords' object has no attribute 'message_idx'

@dpkp
Copy link
Owner

dpkp commented Nov 16, 2017

Are your topics compressed? compacted? Both? Neither?

@rmechler
Copy link

Compacted, but not compressed.

@kerfab
Copy link
Author

kerfab commented Nov 16, 2017

Same here, compacted but not compressed.

@rmechler
Copy link

I put a little debugging in the PartitionRecords() constructor:

    class PartitionRecords(object):
        def __init__(self, fetch_offset, tp, messages):
            self.fetch_offset = fetch_offset
            self.topic_partition = tp
            self.messages = messages
            print("fetch_offset: {}".format(fetch_offset))
            # When fetching an offset that is in the middle of a
            # compressed batch, we will get all messages in the batch.
            # But we want to start 'take' at the fetch_offset
            for i, msg in enumerate(messages):
                print("message offset: {}".format(msg.offset))
                if msg.offset == fetch_offset:
                    self.message_idx = i

and got the following result:

# ./crash_test
consuming
calling poll
fetch_offset: 0
message offset: 165
message offset: 166
message offset: 167
message offset: 168
message offset: 169
message offset: 170
message offset: 171
message offset: 172
message offset: 173
[snip]
message offset: 3757
message offset: 3758
message offset: 3759
message offset: 3760
message offset: 3761
message offset: 3762
Traceback (most recent call last):
  File "./crash_test", line 27, in <module>
    results = consumer.poll(timeout_ms=30000)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 570, in poll
    records = self._poll_once(remaining, max_records)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 618, in _poll_once
    records, _ = self._fetcher.fetched_records(max_records)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 336, in fetched_records
    if not self._next_partition_records:
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 857, in __len__
    if self.messages is None or self.message_idx >= len(self.messages):
AttributeError: 'PartitionRecords' object has no attribute 'message_idx'

So self.message_idx never gets set. It feels like self.message_idx should be set to the first message offset, if it is greater than the fetch offset. But I don't really know the code, so I could be wrong.

@rmechler
Copy link

Oops, that would be wrong. Perhaps it should be initialized to 0, so it points to the first message if there is no offset match.

@dpkp
Copy link
Owner

dpkp commented Nov 17, 2017

This issue was caused by ffc7cae and I think we should probably just revert that for now and head back to the drawing board on that issue.

dpkp added a commit that referenced this issue Nov 17, 2017
The change caused a regression documented in issue #1290
@dpkp
Copy link
Owner

dpkp commented Nov 17, 2017

I've reverted the PR that caused this error and reopened that issue (related to seeking in a compressed topic). I am closing this issue. Please feel free to re-open if the AttributeError persists.

@dpkp dpkp closed this as completed Nov 17, 2017
@kerfab
Copy link
Author

kerfab commented Nov 22, 2017

I know it's closed, but for the record while the topic is not compressed per se, the messages we send are compressed with compression_type='gzip' in the Kafka producer instances we use.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants