Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Revise AbstractConsumerSeekAware.onPartitionsAssigned() for concurreny #3373

Closed
artembilan opened this issue Jul 15, 2024 · 17 comments · Fixed by #3410
Closed

Revise AbstractConsumerSeekAware.onPartitionsAssigned() for concurreny #3373

artembilan opened this issue Jul 15, 2024 · 17 comments · Fixed by #3410
Milestone

Comments

@artembilan
Copy link
Member

Looks like different partitions are registered for same ConsumerSeekCallback when we have a multi-@KafkaListener scenario.
See AbstractConsumerSeekAwareTests and its TODO.

when we have different groups and concurrency, different listener container instances are going to deal with different partitions.
But looks like we add all of them to the same map entry:

this.callbackToTopics.computeIfAbsent(threadCallback, key -> new LinkedList<>()).add(tp);

This way when we call seek later from the logic, may lead to error like:

2024-07-15 20:30:17,431 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] [org.springframework.kafka.listener.KafkaMessageListenerContainer] - Exception while seeking TopicPartitionOffset{topicPartition=Seek-0, offset=null, relativeToCurrent=false, position=BEGINNING}
java.lang.IllegalStateException: No current assignment for partition Seek-0
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367) ~[kafka-clients-3.7.1.jar:?]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:650) ~[kafka-clients-3.7.1.jar:?]
	at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4966) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:648) ~[kafka-clients-3.7.1.jar:?]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.seekToBeginning(LegacyKafkaConsumer.java:804) ~[kafka-clients-3.7.1.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1178) ~[kafka-clients-3.7.1.jar:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processSeeks(KafkaMessageListenerContainer.java:3040) [main/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1397) [main/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1298) [main/:?]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) [?:?]
	at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
2024-07-15 20:30:17,431 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] [org.springframework.kafka.listener.KafkaMessageListenerContainer] - Exception while seeking TopicPartitionOffset{topicPartition=Seek-2, offset=null, relativeToCurrent=false, position=BEGINNING}
java.lang.IllegalStateException: No current assignment for partition Seek-2
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367) ~[kafka-clients-3.7.1.jar:?]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:650) ~[kafka-clients-3.7.1.jar:?]
	at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4966) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:648) ~[kafka-clients-3.7.1.jar:?]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.seekToBeginning(LegacyKafkaConsumer.java:804) ~[kafka-clients-3.7.1.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1178) ~[kafka-clients-3.7.1.jar:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processSeeks(KafkaMessageListenerContainer.java:3040) [main/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1397) [main/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1298) [main/:?]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) [?:?]
	at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
2024-07-15 20:30:17,432 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] [org.springframework.kafka.listener.KafkaMessageListenerContainer] - Exception while seeking TopicPartitionOffset{topicPartition=Seek-1, offset=null, relativeToCurrent=false, position=BEGINNING}
java.lang.IllegalStateException: No current assignment for partition Seek-1
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367) ~[kafka-clients-3.7.1.jar:?]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:650) ~[kafka-clients-3.7.1.jar:?]
	at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4966) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:648) ~[kafka-clients-3.7.1.jar:?]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.seekToBeginning(LegacyKafkaConsumer.java:804) ~[kafka-clients-3.7.1.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1178) ~[kafka-clients-3.7.1.jar:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processSeeks(KafkaMessageListenerContainer.java:3040) [main/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1397) [main/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1298) [main/:?]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) [?:?]
	at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
@bky373
Copy link
Contributor

bky373 commented Jul 15, 2024

@artembilan
Thanks for reporting!
Could you please point me to the code that reproduces?
I've used different group IDs in multiple listeners, but I don't think I've figured out the problem yet.

@artembilan
Copy link
Member Author

See the AbstractConsumerSeekAwareTests you have introduced and that my TODO on concurrency attribute.

I have an impression that the same callback is used for different groups (or different container instances in case of concurrency) and those partitions are distributed between containers. However that add() to the list of the map would lead to the action when that callback will try to seek that partition which does not belong to the current container.
Does not happen consistently . Probably it is OK when both groups have the same partition distributions.

@bky373
Copy link
Contributor

bky373 commented Jul 15, 2024

Thanks!

I'll have to look into the issue further, but for personal reasons I won't be able to dive into it until next week. I'll try to get to it as soon as I can.

@artembilan
Copy link
Member Author

No worries! The first milestone is out today. The next one in a month, so we have plenty of time 😅

@bky373
Copy link
Contributor

bky373 commented Aug 2, 2024

@artembilan
Hi, I've looked into the above issue a bit more.
To start with the results, the issue is reproducible in v3.2, which is before we enhanced AbstractConsumerSeekAware, and I'm guessing it's reproducible in earlier versions as well.
The reproducible code is in the branch here.

Example description

  • Error message: No current assignment for partition Seek-6 ..
  • Error thread: org.springframework.kafka.KafkaListenerEndpointContainer#1-2-C-1 (Hereafter, we'll refer to it as KafkaListenerEndpointContainer#1-2 for convenience)
  • Causes of the error
    • This issue can be broken down into before and after the call to multiGroupListener.seekToBeginning() in the test code.
    • Before calling that method, the thread consumer was assigned partitions 6, 7, and 8.
      Then it calls the seekToBeginning() method, and the ListenerConsumer.seeks queue adds partitions 6, 7, and 8 to the consumer.
    • However, after the seekToBeginning() method exited, the ListenerConsumerRebalanceListener was activated and the partition was reassigned to 7.
    • Partitions 6, 7, and 8 still exist in the ListenerConsumer.seeks queue, but the consumer no longer subscribes to 6, hence the error.
  • Workaround
    • To use the partition in seeks, there should be a condition that the partition is not reassigned due to rebalancing, etc.
    • I'm thinking about what we can do about this. If you have any suggestions, I'd love to hear them.
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#1-2, assigned: [6, 7, 8]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#1-2, assigned: [6, 7, 8]
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#1-2, assigned: [6, 7, 8], seeks: []

================= Where `ListenerConsumer.seeks` queue is appended. =====================================================
## [multiGroupListener.seekToBeginning()] start
---------------- [ListenerConsumer.seekToBeginning] id: org.springframework.kafka.KafkaListenerEndpointContainer#1-2, seeks: [6, 7, 8]
## [multiGroupListener.seekToBeginning()] end
==========================================================================================================================================

==========================================================================================================================================
=========== Before `ListenerConsumer.processSeeks()`, sometimes partitions are newly assigned, BUT `seeks` doesn't change.
=========== For this reason, the partition within seeks may not be newly assigned to the consumer, and you may see an error such as `No current assignment for partition Seek-N`.
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#1-2, assigned: [7]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#1-2, assigned: [7]
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#1-2, assigned: [7], seeks: [6, 7, 8]
2024-08-02 22:26:11,273 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#1-2-C-1] [org.springframework.kafka.listener.KafkaMessageListenerContainer] - Exception while seeking TopicPartitionOffset{topicPartition=Seek-6, offset=null, relativeToCurrent=false, position=BEGINNING}
java.lang.IllegalStateException: No current assignment for partition Seek-6
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367) ~[kafka-clients-3.7.0.jar:?]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:650) ~[kafka-clients-3.7.0.jar:?]
	at java.base/java.util.Collections$SingletonList.forEach(Collections.java:5186) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:648) ~[kafka-clients-3.7.0.jar:?]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.seekToBeginning(LegacyKafkaConsumer.java:804) ~[kafka-clients-3.7.0.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1178) ~[kafka-clients-3.7.0.jar:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processSeeks(KafkaMessageListenerContainer.java:3044) ~[main/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1401) ~[main/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1302) ~[main/:?]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]

@artembilan
Copy link
Member Author

So, is the idea for fix to filter out only actual partition to seek and ignore those which are not assigned to us at the moment?

@bky373
Copy link
Contributor

bky373 commented Aug 2, 2024

I thought about that, but it's not a perfect solution.
This is because if you ignore a partition in the consumer seeks because it is not subscribed to, you will not be able to adjust the offset on the ignored partition.

I'm thinking about what we can do about this. If you have any suggestions, I'd love to hear them.

@artembilan
Copy link
Member Author

Isn’t that partition assigned / revoked API about? Shouldn’t we adjust our maps respectively ?

@bky373
Copy link
Contributor

bky373 commented Aug 2, 2024

We should also look at the maps used by the partition assigned/revoked API, but I think we may think more about how we can change the ListenerConsumer.seeks queue. I thought that if I change the map but not the seeks, the result would be the same, but I'll check this out a bit more

@bky373
Copy link
Contributor

bky373 commented Aug 3, 2024

This issue seems to occur frequently when concurrency grows. It happens with multi-group listeners, but it also happens with single-group listeners with high concurrency.

I'm wondering if the high concurrency is causing latency somewhere, and the rebalancing is happening when it exceeds that latency. I'd love to know where, but it's not easy to find.

Below is an error from a partition = 9, concurrency = 7 single-group consumer.

The KafkaListenerEndpointContainer#0-4 consumer was initially assigned partition 8, which added 8 to the seeks queue, but later the partition was reassigned to 6 and the offset of partition 8 became unchangeable.

## ConsumerStartedEvent [source=KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-0, clientIndex=-0, topicPartitions=[]]]
## ConsumerStartedEvent [source=KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-1, clientIndex=-1, topicPartitions=[]]]
## ConsumerStartedEvent [source=KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-2, clientIndex=-2, topicPartitions=[]]]
## ConsumerStartedEvent [source=KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-3, clientIndex=-3, topicPartitions=[]]]
## ConsumerStartedEvent [source=KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-4, clientIndex=-4, topicPartitions=[]]]
## ConsumerStartedEvent [source=KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-5, clientIndex=-5, topicPartitions=[]]]
## ConsumerStartedEvent [source=KafkaMessageListenerContainer [id=org.springframework.kafka.KafkaListenerEndpointContainer#0-6, clientIndex=-6, topicPartitions=[]]]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-0, assigned: [0, 1]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-2, assigned: [4, 5]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-1, assigned: [2, 3]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-4, assigned: [8]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-3, assigned: [6, 7]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-4, assigned: [8]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-1, assigned: [2, 3]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-2, assigned: [4, 5]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-3, assigned: [6, 7]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-0, assigned: [0, 1]
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-0, assigned: [0, 1], seeks: []
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-1, assigned: [2, 3], seeks: []
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-3, assigned: [6, 7], seeks: []
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-4, assigned: [8], seeks: []
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-2, assigned: [4, 5], seeks: []

================= Where `ListenerConsumer.seeks` queue is appended. =====================================================
## [multiGroupListener.seekToBeginning()] start
---------------- [ListenerConsumer.seekToBeginning] id: org.springframework.kafka.KafkaListenerEndpointContainer#0-3, seeks: [6, 7]
---------------- [ListenerConsumer.seekToBeginning] id: org.springframework.kafka.KafkaListenerEndpointContainer#0-0, seeks: [0, 1]
---------------- [ListenerConsumer.seekToBeginning] id: org.springframework.kafka.KafkaListenerEndpointContainer#0-2, seeks: [4, 5]
---------------- [ListenerConsumer.seekToBeginning] id: org.springframework.kafka.KafkaListenerEndpointContainer#0-1, seeks: [2, 3]
---------------- [ListenerConsumer.seekToBeginning] id: org.springframework.kafka.KafkaListenerEndpointContainer#0-4, seeks: [8]
## [multiGroupListener.seekToBeginning()] end
==========================================================================================================================================

==========================================================================================================================================
=========== Before `ListenerConsumer.processSeeks()`, sometimes partitions are newly assigned, BUT `seeks` doesn't change.
=========== For this reason, the partition within seeks may not be newly assigned to the consumer, and you may see an error such as `No current assignment for partition Seek-N`.
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-2, assigned: [4]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-1, assigned: [2, 3]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-4, assigned: [6]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-3, assigned: [5]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-6, assigned: [8]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-5, assigned: [7]
-- [ListenerConsumerRebalanceListener.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-0, assigned: [0, 1]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-4, assigned: [6]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-5, assigned: [7]
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-5, assigned: [7], seeks: []
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-6, assigned: [8]
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-6, assigned: [8], seeks: []
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-2, assigned: [4]
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-2, assigned: [4], seeks: [4, 5]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-3, assigned: [5]
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-3, assigned: [5], seeks: [6, 7]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-1, assigned: [2, 3]
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-4, assigned: [6], seeks: [8]
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-1, assigned: [2, 3], seeks: [2, 3]
-------- [AbstractConsumerSeekAware.onPartitionsAssigned] thread: KafkaListenerEndpointContainer#0-0, assigned: [0, 1]
------------ [ListenerConsumer.seekPartitions] thread: KafkaListenerEndpointContainer#0-0, assigned: [0, 1], seeks: [0, 1]
2024-08-03 13:19:07,945 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] [org.springframework.kafka.listener.KafkaMessageListenerContainer] - Exception while seeking TopicPartitionOffset{topicPartition=Seek-8, offset=null, relativeToCurrent=false, position=BEGINNING}
java.lang.IllegalStateException: No current assignment for partition Seek-8
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367) ~[kafka-clients-3.7.0.jar:?]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.lambda$requestOffsetReset$3(SubscriptionState.java:650) ~[kafka-clients-3.7.0.jar:?]
	at java.base/java.util.Collections$SingletonList.forEach(Collections.java:5186) ~[?:?]
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:648) ~[kafka-clients-3.7.0.jar:?]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.seekToBeginning(LegacyKafkaConsumer.java:804) ~[kafka-clients-3.7.0.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1178) ~[kafka-clients-3.7.0.jar:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processSeeks(KafkaMessageListenerContainer.java:3047) ~[main/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1404) ~[main/:?]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1305) ~[main/:?]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]

@bky373
Copy link
Contributor

bky373 commented Aug 6, 2024

@artembilan
IMHO, at runtime, (just before) the moment you try to reset the offset, rebalancing may occur for some reasons (for example, due to an increase in the number of consumers or partitions).

IMHO, for some reason, the rebalancing listener's partition assignment method may be called at runtime just as (or just before) you want to reset the offset. (I'm not sure exactly why this happens, other than that high concurrency makes it more likely).
In this case, the partitions will be reassigned and you may not be able to adjust the offset as you want, but Spring Kafka will tell you in the error log which partition failed.
Therefore, users should be aware of the possibility of this error in advance and have an alternative, such as re-requesting to seek the offset for the failed partition.

It's not awkward to get an error, but the official documentation doesn't talk about this error possibility, so I think it would be nice to leave a cautionary note in the documentation. What do you think about this?

@artembilan
Copy link
Member Author

Thank you, @bky373 , for investigation!

So, according to your findings it looks like the problem is even reproducible with Spring for Apache Kafka, but really with plain Apache Kafka client and respective concurrency and seeking.
Which, probably, just fully left up to target project implementation.

So, how about instead of throwing that exception up to the listener container thread, just emit a WARN into logs and ignore respectively?
That is really volatile illegal state, but apparently it is out of our client control: the broker may decide to evict this consumer from a group at any moment.

Another mitigation could be done via onPartitionsRevoked/onPartitionsLost callbacks to clean those partitions up from our seeks to avoid possible WARN on the next poll.

According to the ConsumerRebalanceListener, it is called from the Consumer.poll(), so that should be safe to modify our seeks queue since it is going to be used only on the next poll.
It might be turned into LinkedHashMap for easier handling.
However it might be also guarded by a dedicated Lock since all those seek...() method might be called from different threads.

Another way is just to ignore onPartitionsRevoked/onPartitionsLost and filter out seeks loop according to actual assignment.
I don't see a reason in warning end-user that partition we would like to seek is not under our control any more.
The race condition with the broker is really good to notify, at least as a WARN into logs.
We may think about some special PartitionNotAssignedEvent, but that might be done later, on demand.

WDYT?

@bky373
Copy link
Contributor

bky373 commented Aug 7, 2024

@artembilan
Thanks for your comment!

Hmm... I guess I'm still not convinced. I'm still concerned about whether WARN is the right log level.
End-users will expect and use AbstractConsumerSeekAware for the benefit of being able to move the offset in a non-disruptive way at runtime, and the current ERROR log seems good enough to let them know that the seeking failed.

Considering that resetting the offset is usually to (re)process the message, if it fails, we should probably take immediate action, such as a re-request; in this respect, I think the ERROR is explicit..., But perhaps there is a point where it seems unreasonable. Could you tell me a little bit more about the point where it seems unreasonable?

I would greatly appreciate your response, as it would help me understand better! Thank you, as always.

@artembilan
Copy link
Member Author

Well, when it is an ERROR that means something is wrong with an application.
In our case the partitions assignment on the broker is out of our control.
At least in this race condition when we have already sent seek command, but didn't receive onPartitionsRevoked/onPartitionsLost yet.
I don't know Kafka algorithm in details, but we can receive onPartitionsRevoked/onPartitionsLost only when we call poll.
At the same time we don't want to call poll because we want to seek before that.
So, that is that kind of race condition which we are facing.
(Don't get me wrong, but that's my impression what we got here).

So, since this is not what has been caused by the application, therefore ERROR would be misleading.
The WARN is totally OK to notify that the state has been changed and we cannot seek partitions we don't owe any more.
Therefore, something else has to be done in the application to mitigate a situation.
But, no, that is indeed not an ERROR and application logic is OK.

And that's why I suggested that PartitionNotAssignedEvent to extend an application logic to deal with this race condition around revoked partition during seek.

Does that make sense?

@bky373
Copy link
Contributor

bky373 commented Aug 7, 2024

Thanks! That helped me understand.
I guess it would be beneficial to test further when onPartitionsRevoked/onPartitionsLost events occur, as my previous output focused solely on partition assignment. I'll proceed with this and leave another comment afterward!
Due to time zone differences, it might take up to a day before I can get back to you.

@bky373
Copy link
Contributor

bky373 commented Aug 8, 2024

@artembilan
After some more testing, the scenario for partition reassignment became a bit clearer.

Assume the following arbitrary conditions.

  • partition = 9
  • concurrency = 7

1. Consumers started

  • You can tell by the ConsumerStartedEvent being published.

2. Leading consumers join the group and monopolize the partitions

  • Some fast-moving consumers are assigned all partitions.
  • The broker property has group.initial.rebalance.delay.ms (default: 3000). Only consumers that send joinGroup requests within this time will be assigned partitions, while others will wait until the next rebalance occurs.

3. Store partition offsets for seek

  • This is done when you run the seek...() method.
  • Store the offset to seeks queue relative to the partition assigned in step 2.
  • Naturally, slow consumers have nothing stored in seeks because they have no partition assigned to them.

4. Rebalancing consumers

  • Before the next pollAndInvoke(), the slow consumers join the group late.
  • Partition revocation/reassignment takes place at this time.

5. Errors during seeking

  • The rebalancing in step 4 changes the previously assigned partitions of the leading consumers to new partitions.
  • This causes the newly assigned partition to be different from the 'seeks' partition at the next polling, resulting in the error.

Now that I've found the cause of the error, I'll write a PR that includes two things as soon as I can.

  • Setting the group.initial.rebalance.delay.ms value generously so that the existing test code always succeeds without error with high concurrency. sample
  • And use WARN logs instead of ERROR when an error occurs in step 6. sample
    • a log example
    2024-08-09 01:20:41,787 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#1-2-C-1] - No current assignment for partition Seek-5 due to partition reassignment prior to seeking. ```
    

And I didn't use the PartitionNotAssignedEvent because I wasn't sure how to use it well.

@artembilan
Copy link
Member Author

I think I'm fine with the plan.
Let's see some PR from you!
The WARN is enough for now: we can come back to the PartitionNotAssignedEvent when someone requests.

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants