Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add wakeup to Consumer as in Java Client #1148

Open
tvoinarovskyi opened this issue Jul 14, 2017 · 2 comments
Open

Add wakeup to Consumer as in Java Client #1148

tvoinarovskyi opened this issue Jul 14, 2017 · 2 comments

Comments

@tvoinarovskyi
Copy link
Collaborator

Java client has wakeup() functions that can be used to break out of a poll() and can be called from another thread. It's needed for some unique cases like the process pool processing example, where we have 1 consumer and a pool of workers.

@dpkp
Copy link
Owner

dpkp commented Oct 10, 2017

kafka-python supports wakeup via a socketpair in KafkaClient. This allows threads sharing a KafkaClient instance that is blocked waiting for IO to signal that wakeup is desired for other processing.

There is also a related, but slightly different, "wakeup" feature in the java KafkaConsumer class. This allows threads that share a KafkaConsumer instance to trigger a WakeupException to be raised from the next consumer.poll() call. As far as I can tell this is used primarily to signal a looping consumer that it should shutdown. But I think the same could be done with simple external concurrency primitives, like a shared threading.Event() perhaps.

You filed this issue wrt the second (KafkaConsumer.wakeup()), correct?

@tvoinarovskyi
Copy link
Collaborator Author

tvoinarovskyi commented Oct 11, 2017

Some time ago I wrote this https://gist.github.com/tvoinarovskyi/05a5d083a0f96cae3e9b4c2af580be74 gist. It lets the consumer delegate consumed requests to queues and consume from those queues in separate threads. The upside here is that we pause any partitions that have data in queues still pending for processing, so we can basically have a background heartbeat by calling poll(0) with all partitions paused.
The problem here is that when a thread finishes processing data from a queue, it needs to notify the consumer, so it will unpause this partition. In Java, the wakeup method can be used to do that, but in kafka-python I can't do it with the public API.

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests

2 participants