-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
KAFKA-18645: New consumer should align close timeout handling with classic consumer #18702
Conversation
b100413
to
0cfe496
Compare
0cfe496
to
8675c7b
Compare
So given this configuration:
When the user calls The change to the timeout behavior was introduced relatively recently in 3.5 via KAFKA-7109. Looking at #12590, I'm not sure the change to ignore the user's timeout was necessarily intentional. When closing, individual network requests should adhere to |
8675c7b
to
de789f2
Compare
My understanding is that this behaviour on close regarding the timeout has been in place for a long time (introduced here f72203e). The fetch improvements of KAFKA-7109 seems to just apply to the fetch requests on close the same timeout principle that was being applied to the coordinator close. Makes sense? I'll be taking a full pass today. Thanks! |
According to the commit you mentioned, I think you are right.
This reminds me that the patch applied this close timeout principle to all closing behavior and I'm not sure if it is what we want. IIRC, classic consumer only used the timer for the coordinator and fetcher. 🤔 |
True, but let's keep in mind that translating that to the new consumer means that we should apply the min to the steps to commit, leave, fetcher close, and network client close I expect (because this last step will wait for any of the requests generated on the previous steps). Makes sense? (Agree that we shouldn't apply it blindly) |
Yes, thanks for the further explanation. |
de789f2
to
a4e4bf5
Compare
That said, let's fix this gap with the solution as proposed. No one has complained about the behavior of the existing consumer timeout. I have no intention of dying on this hill; I just personally find it confusing 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @frankvicky.
I'd like to see a sanity check unit test added to somewhere like KafkaConsumerTest
that ensures that the value of request.timeout.ms
is used over the timeout passed in. If that's the intended and documented behavior, we should validate it to be so.
Thanks!
a4e4bf5
to
a0a2920
Compare
Hi @kirktrue
I think it's not easy to check it. Could we rely on |
If it's a lot of work, I guess we can skip it and assume |
944a037
to
e27781a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @frankvicky !
@@ -1326,7 +1331,7 @@ private void close(Duration timeout, boolean swallowException) { | |||
// We are already closing with a timeout, don't allow wake-ups from here on. | |||
wakeupTrigger.disableWakeups(); | |||
|
|||
final Timer closeTimer = time.timer(timeout); | |||
final Timer closeTimer = createTimer(timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not introduced here but affected with this change. I notice that runRebalanceCallbacksOnClose
consumes time from the close timeout, right? (receives the timer just to update it). But that behaviour is not the same in the Classic Consumer.
In the classic, the close timeout only applies to requests really. The callbacks run when closing the Abstract coordinator, without time boundaries, and most importantly, without consuming time from the close timeout. We runRebalanceCallbacksOnClose
without time boundaries too, but it does consume the time from the timeout param, right? Wouldn't that potentially leave less time for the following requests? I'm concerned about existing apps, running callbacks, calling close with a timeout that used to be "enough", but now it may not be. Should we simply remove the timer from the runRebalanceCallbacksOnClose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just walk through the corresponding logic of the classic consumer.
The callback on close doesn't consume the time for the timer, to align the behavior, I think it's ok to remove the timer from runRebalanceCallbacksOnClose
@@ -153,7 +153,8 @@ public class CommonClientConfigs { | |||
public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait " | |||
+ "for the response of a request. If the response is not received before the timeout " | |||
+ "elapses the client will resend the request if necessary or fail the request if " | |||
+ "retries are exhausted."; | |||
+ "retries are exhausted. This timeout also applies to the consumer close operation - " | |||
+ "even if a larger timeout is specified for close, it will be limited by this value."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well just for our understanding and related to my other comment: the classic behaviour we're trying to keep is that the request timeout applies to operations performed with the coordinator and the leader (coordinator-related requests and fetch sessions), not to other close operations that do not perform any request. I'm not suggesting to add any of that here, would polute this config doc imo. Actually, since it's really specific to consumer.close, isn't it enough to explain the behaviour on the close API java doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it enough to explain the behaviour on the close API java doc?
+1 to add docs to close API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's crucial to highlight that neither the OffsetCommitCallback
nor the ConsumerRebalanceListener
callbacks consume time from the close timeout.
e27781a
to
62b3dc2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @frankvicky !
@@ -1782,6 +1782,13 @@ public void close() { | |||
* timeout. If the consumer is unable to complete offset commits and gracefully leave the group | |||
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be | |||
* used to interrupt close. | |||
* <p> | |||
* The actual maximum wait time is bounded by the {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which | |||
* only applies to operations performed with the coordinator (coordinator-related requests and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we say this instead? (could be to the coordinator or the leader)
* only applies to operations performed with the coordinator (coordinator-related requests and | |
* only applies to operations performed with the broker (coordinator-related requests and |
@@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean swallowException) { | |||
} | |||
} | |||
|
|||
private Timer createTimer(Duration timeout) { | |||
return time.timer(Duration.ofMillis(Math.min(timeout.toMillis(), requestTimeoutMs))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return time.timer(Duration.ofMillis(Math.min(timeout.toMillis(), requestTimeoutMs))); | |
return time.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); |
@@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean swallowException) { | |||
} | |||
} | |||
|
|||
private Timer createTimer(Duration timeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we show this is for the close op? createTimerForCloseRequests
or similar maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is nice to follow the style of classic consumers.
@@ -1368,6 +1373,10 @@ private void close(Duration timeout, boolean swallowException) { | |||
} | |||
} | |||
|
|||
private Timer createTimer(Duration timeout) { | |||
return time.timer(Duration.ofMillis(Math.min(timeout.toMillis(), requestTimeoutMs))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a null check to the time obj here? Similar to what the classic does, since close can be called from the constructor at any point (finally) if something fails.
390cb14
to
c62499f
Compare
@frankvicky could you please merge trunk latest changes? The flaky/failing transactions test has been fixed with #18793 Thanks! |
…assic consumer JIRA: KAFKA-18645 see discussion: apache#18590 (comment) In the classic consumer, the timeout respects request.timeout.ms. However, in the async consumer, this logic is either missing or only applies to individual requests. Unlike the classic consumer, where request.timeout.ms works for the entire coordinator closing behavior, the async implementation handles timeouts differently. We should align the close timeout-handling to enable ConsumerBounceTest#testClose
c62499f
to
ca8c76e
Compare
Sure! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM
…assic consumer (#18702) Reviewers: Lianet Magrans <lmagrans@confluent.io>, Kirk True <ktrue@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Merged to trunk and cherry picked to 4.0 |
…assic consumer (apache#18702) Reviewers: Lianet Magrans <lmagrans@confluent.io>, Kirk True <ktrue@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
JIRA: KAFKA-18645
see discussion:
#18590 (comment)
In the classic consumer, the timeout respects
request.timeout.ms
. However, in the async consumer, this logic is either missing or only applies to individual requests. Unlike the classic consumer, whererequest.timeout.ms
works for the entire coordinator closing behavior, the async implementation handles timeouts differently.We should align the close timeout-handling to enable
ConsumerBounceTest#testClose
Committer Checklist (excluded from commit message)