Skip to content

Commit

Permalink
MINOR: Fix broken test (#18062)
Browse files Browse the repository at this point in the history
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>
  • Loading branch information
lianetm authored Dec 6, 2024
1 parent 3a04990 commit 36b4853
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public class ConsumerGroupHeartbeatRequest extends AbstractRequest {
*/
public static final int CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION = 1;

public static final String REGEX_RESOLUTION_NOT_SUPPORTED_MSG = "The cluster does not support " +
"regular expressions resolution on ConsumerGroupHeartbeat API version 0. It must be upgraded to use " +
"ConsumerGroupHeartbeat API version >= 1 to allow to subscribe to a SubscriptionPattern.";

public static class Builder extends AbstractRequest.Builder<ConsumerGroupHeartbeatRequest> {
private final ConsumerGroupHeartbeatRequestData data;

Expand All @@ -61,9 +65,7 @@ public Builder(ConsumerGroupHeartbeatRequestData data, boolean enableUnstableLas
@Override
public ConsumerGroupHeartbeatRequest build(short version) {
if (version == 0 && data.subscribedTopicRegex() != null) {
throw new UnsupportedVersionException("The cluster does not support regular expressions resolution " +
"on ConsumerGroupHeartbeat API version " + version + ". It must be upgraded to use " +
"ConsumerGroupHeartbeat API version >= 1 to allow to subscribe to a SubscriptionPattern.");
throw new UnsupportedVersionException(REGEX_RESOLUTION_NOT_SUPPORTED_MSG);
}
return new ConsumerGroupHeartbeatRequest(data, version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -65,6 +66,8 @@
import java.util.Set;
import java.util.SortedSet;

import static org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager.CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -602,30 +605,21 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole
}
}

@Test
public void testUnsupportedVersion() {
mockErrorResponse(Errors.UNSUPPORTED_VERSION, null);
/**
* This validates the UnsupportedApiVersion the client generates while building a HB if:
* 1. HB API is not supported.
* 2. Required HB API version is not available.
*/
@ParameterizedTest
@ValueSource(strings = {CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, REGEX_RESOLUTION_NOT_SUPPORTED_MSG})
public void testUnsupportedVersion(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg));
ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();

// UnsupportedApiVersion in HB response without any custom message. It's considered as new protocol not supported.
String hbNotSupportedMsg = "The cluster does not support the new consumer group protocol. Set group" +
".protocol=classic on the consumer configs to revert to the classic protocol until the cluster is upgraded.";
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error());
assertEquals(hbNotSupportedMsg, errorEvent.error().getMessage());
assertEquals(errorMsg, errorEvent.error().getMessage());
clearInvocations(backgroundEventHandler);

// UnsupportedApiVersion in HB response with custom message. Specific to required version not present, should
// keep the custom message.
String hbVersionNotSupportedMsg = "The cluster does not support resolution of SubscriptionPattern on version 0. " +
"It must be upgraded to version >= 1 to allow to subscribe to a SubscriptionPattern.";
mockErrorResponse(Errors.UNSUPPORTED_VERSION, hbVersionNotSupportedMsg);
errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
errorEvent = errorEventArgumentCaptor.getValue();
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error());
assertEquals(hbVersionNotSupportedMsg, errorEvent.error().getMessage());
}

private void mockErrorResponse(Errors error, String exceptionCustomMsg) {
Expand All @@ -637,7 +631,17 @@ private void mockErrorResponse(Errors error, String exceptionCustomMsg) {
ClientResponse response = createHeartbeatResponse(
result.unsentRequests.get(0), error, exceptionCustomMsg);
result.unsentRequests.get(0).handler().onComplete(response);
ConsumerGroupHeartbeatResponse mockResponse = (ConsumerGroupHeartbeatResponse) response.responseBody();
}

private void mockResponseWithException(UnsupportedVersionException exception) {
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());

when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
ClientResponse response = createHeartbeatResponseWithException(
result.unsentRequests.get(0), exception);
result.unsentRequests.get(0).handler().onComplete(response);
}

private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) {
Expand Down Expand Up @@ -1038,6 +1042,23 @@ private ClientResponse createHeartbeatResponse(
response);
}

private ClientResponse createHeartbeatResponseWithException(
final NetworkClientDelegate.UnsentRequest request,
final UnsupportedVersionException exception
) {
ConsumerGroupHeartbeatResponse response = new ConsumerGroupHeartbeatResponse(null);
return new ClientResponse(
new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
request.handler(),
"0",
time.milliseconds(),
time.milliseconds(),
false,
exception,
null,
response);
}

private ConsumerConfig config() {
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Expand Down

0 comments on commit 36b4853

Please # to comment.