-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Do network connections and writes in KafkaClient.poll() #1729
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
if blocking: | ||
error = self.send_pending_requests() | ||
if isinstance(error, Exception): | ||
future.failure(error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps add a debug-level log statement here of this error?
It looks like send_pending_requests()
already logs most (but not all) errors, so this may be superfluous, but my one thought is that if someone files a ticket, we have a little more visibility/guarantees about the errors they're hitting...
future.failure(error) | ||
return future | ||
|
||
log.debug('%s Request %d: %s', self, correlation_id, request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this log line be located above the if blocking:
line? Since the info seems useful regardless of whether blocking
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current implementation we only log this after the request is sent successfully. So I put this after the blocking section to keep it consistent.
except ConnectionError as e: | ||
log.exception("Error sending %s to %s", request, self) | ||
log.exception("Error sending request data to %s", self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to stop logging the request
value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this design we only have the encoded bytes at this stage -- we no longer have the original request object. so for that reason i took it out of the log message. This error should be sent down to the future and we can expect that the error handler for the request future will be responsible for logging the details.
"""Send a request to a specific node. | ||
"""Send a request to a specific node. Bytes are placed on an | ||
internal per-connection send-queue. Actual network I/O will be | ||
triggered in a subsequent call to .poll() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this create any new scenarios where we now need to make sure to call poll()
?
In other words, will this break any behavior that used to work fine w/o ever calling poll()
?
Currently, I can't think of any--looks like metadata refresh picks this up automatically since it relies on maybe_connect
and you also updated the fetcher to always call poll()
, just wondering if there might be any others...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible, yes, but they should be rare. The main culprits would be blocking loops attempting to connect without calling poll(), or not calling poll() unless there are in-flight-requests. I think I've found and fixed those issues, but definitely keep an eye open for others.
3fd7d09
to
0a8914f
Compare
I rebased to pickup the latest changes on |
I'm going to be out of town / offline for the next week or so. Tests are passing, and I'm satisfied w/ where this is at for now, so I'm going to merge to master. Feel free to post more feedback here that you collect from real-world usage. |
Sounds good, thanks for all the hard work here. |
This PR completes #981 and attempts to address several reports of consumer lockups that appear related to KafkaClient blocking for up to the full request_timeout_ms while holding the client lock, while all other threads are prevented from initiating new network requests or connections until the lock is released.
There are 4 commits here. The first is a simple refactor of BrokerConnection that separates queueing of a new network request (via .send / ._send) and performing the actual network IO (via .send_pending_requests).
The second updates KafkaClient to only performing network IO requests during .poll(). It uses the wakeup channel to signal between threads, allowing a sender to wakeup a blocked poller and trigger an immediate call to .send_pending_requests().
The third commit address network connection management, separate from network writes. It updates KafkaClient.send() to only acquire the client lock when a new connection is needed.
And the fourth commit completes the transition by moving all connection attempts via _maybe_connect into KafkaClient.poll(), which should eliminate the thread contention between a thread that is polling and some other thread that wants to initiate network IO.
This change is