Skip to content

Commit

Permalink
KAFKA-18645: New consumer should align close timeout handling with cl…
Browse files Browse the repository at this point in the history
…assic consumer

JIRA: KAFKA-18645
see discussion:
#18590 (comment)

In the classic consumer, the timeout respects request.timeout.ms.
However, in the async consumer, this logic is either missing or only
applies to individual requests. Unlike the classic consumer, where
request.timeout.ms works for the entire coordinator closing behavior,
the async implementation handles timeouts differently.

We should align the close timeout-handling to enable
ConsumerBounceTest#testClose
  • Loading branch information
frankvicky committed Jan 26, 2025
1 parent a8f6fc9 commit 8675c7b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public class CommonClientConfigs {
public static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the client will wait "
+ "for the response of a request. If the response is not received before the timeout "
+ "elapses the client will resend the request if necessary or fail the request if "
+ "retries are exhausted.";
+ "retries are exhausted. This timeout also applies to the consumer close operation - "
+ "even if a larger timeout is specified for close, it will be limited by this value.";

public static final String DEFAULT_LIST_KEY_SERDE_INNER_CLASS = "default.list.key.serde.inner";
public static final String DEFAULT_LIST_KEY_SERDE_INNER_CLASS_DOC = "Default inner class of list serde for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,10 @@ public void close() {
* timeout. If the consumer is unable to complete offset commits and gracefully leave the group
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
* used to interrupt close.
* <p>
* The actual maximum wait time is bounded by the {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting. Even if a
* larger timeout is specified, the consumer will not wait longer than {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG}
* for any request to complete during the close operation.
*
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
private final ConsumerMetadata metadata;
private final Metrics metrics;
private final long retryBackoffMs;
private final int requestTimeoutMs;
private final Duration defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private volatile boolean closed = false;
Expand Down Expand Up @@ -323,6 +324,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
this.metrics = createMetrics(config, time, reporters);
this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);

List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
this.interceptors = new ConsumerInterceptors<>(interceptorList);
Expand Down Expand Up @@ -446,6 +448,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
SubscriptionState subscriptions,
ConsumerMetadata metadata,
long retryBackoffMs,
int requestTimeoutMs,
int defaultApiTimeoutMs,
String groupId,
boolean autoCommitEnabled) {
Expand All @@ -465,6 +468,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
this.groupMetadata.set(initializeGroupMetadata(groupId, Optional.empty()));
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
Expand Down Expand Up @@ -498,6 +502,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
this.metrics = new Metrics(time);
this.metadata = metadata;
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
this.clientTelemetryReporter = Optional.empty();
Expand Down Expand Up @@ -1325,7 +1330,7 @@ private void close(Duration timeout, boolean swallowException) {
// We are already closing with a timeout, don't allow wake-ups from here on.
wakeupTrigger.disableWakeups();

final Timer closeTimer = time.timer(timeout);
final Timer closeTimer = createTimer(timeout);
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
closeTimer.update();
// Prepare shutting down the network thread
Expand Down Expand Up @@ -1365,6 +1370,10 @@ private void close(Duration timeout, boolean swallowException) {
}
}

private Timer createTimer(Duration timeout) {
return time.timer(Duration.ofMillis(Math.min(timeout.toMillis(), requestTimeoutMs)));
}

private void autoCommitOnClose(final Timer timer) {
if (groupMetadata.get().isEmpty())
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ private AsyncKafkaConsumer<String, String> newConsumer(
String clientId,
boolean autoCommitEnabled) {
long retryBackoffMs = 100L;
int requestTimeoutMs = 30000;
int defaultApiTimeoutMs = 1000;
return new AsyncKafkaConsumer<>(
new LogContext(),
Expand All @@ -261,6 +262,7 @@ private AsyncKafkaConsumer<String, String> newConsumer(
subscriptions,
metadata,
retryBackoffMs,
requestTimeoutMs,
defaultApiTimeoutMs,
groupId,
autoCommitEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testClose(quorum: String, groupProtocol: String): Unit = {
val numRecords = 10
val producer = createProducer()
Expand Down

0 comments on commit 8675c7b

Please # to comment.