From 36b48536f62698a5cc6be3c24bb68da6e2602b57 Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Thu, 5 Dec 2024 21:31:52 -0500 Subject: [PATCH] MINOR: Fix broken test (#18062) Reviewers: David Jacot , Chia-Ping Tsai , TaiJuWu --- .../ConsumerGroupHeartbeatRequest.java | 8 ++- .../ConsumerHeartbeatRequestManagerTest.java | 61 +++++++++++++------ 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java index 4f977a99fbe09..5b09131d49470 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java @@ -46,6 +46,10 @@ public class ConsumerGroupHeartbeatRequest extends AbstractRequest { */ public static final int CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION = 1; + public static final String REGEX_RESOLUTION_NOT_SUPPORTED_MSG = "The cluster does not support " + + "regular expressions resolution on ConsumerGroupHeartbeat API version 0. It must be upgraded to use " + + "ConsumerGroupHeartbeat API version >= 1 to allow to subscribe to a SubscriptionPattern."; + public static class Builder extends AbstractRequest.Builder { private final ConsumerGroupHeartbeatRequestData data; @@ -61,9 +65,7 @@ public Builder(ConsumerGroupHeartbeatRequestData data, boolean enableUnstableLas @Override public ConsumerGroupHeartbeatRequest build(short version) { if (version == 0 && data.subscribedTopicRegex() != null) { - throw new UnsupportedVersionException("The cluster does not support regular expressions resolution " + - "on ConsumerGroupHeartbeat API version " + version + ". It must be upgraded to use " + - "ConsumerGroupHeartbeat API version >= 1 to allow to subscribe to a SubscriptionPattern."); + throw new UnsupportedVersionException(REGEX_RESOLUTION_NOT_SUPPORTED_MSG); } return new ConsumerGroupHeartbeatRequest(data, version); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java index 415a84ebbb906..8fc23f6255acf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.metrics.Metrics; @@ -65,6 +66,8 @@ import java.util.Set; import java.util.SortedSet; +import static org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager.CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG; +import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.REGEX_RESOLUTION_NOT_SUPPORTED_MSG; import static org.apache.kafka.common.utils.Utils.mkSortedSet; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -602,30 +605,21 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole } } - @Test - public void testUnsupportedVersion() { - mockErrorResponse(Errors.UNSUPPORTED_VERSION, null); + /** + * This validates the UnsupportedApiVersion the client generates while building a HB if: + * 1. HB API is not supported. + * 2. Required HB API version is not available. + */ + @ParameterizedTest + @ValueSource(strings = {CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, REGEX_RESOLUTION_NOT_SUPPORTED_MSG}) + public void testUnsupportedVersion(String errorMsg) { + mockResponseWithException(new UnsupportedVersionException(errorMsg)); ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); ErrorEvent errorEvent = errorEventArgumentCaptor.getValue(); - - // UnsupportedApiVersion in HB response without any custom message. It's considered as new protocol not supported. - String hbNotSupportedMsg = "The cluster does not support the new consumer group protocol. Set group" + - ".protocol=classic on the consumer configs to revert to the classic protocol until the cluster is upgraded."; assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error()); - assertEquals(hbNotSupportedMsg, errorEvent.error().getMessage()); + assertEquals(errorMsg, errorEvent.error().getMessage()); clearInvocations(backgroundEventHandler); - - // UnsupportedApiVersion in HB response with custom message. Specific to required version not present, should - // keep the custom message. - String hbVersionNotSupportedMsg = "The cluster does not support resolution of SubscriptionPattern on version 0. " + - "It must be upgraded to version >= 1 to allow to subscribe to a SubscriptionPattern."; - mockErrorResponse(Errors.UNSUPPORTED_VERSION, hbVersionNotSupportedMsg); - errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); - verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); - errorEvent = errorEventArgumentCaptor.getValue(); - assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error()); - assertEquals(hbVersionNotSupportedMsg, errorEvent.error().getMessage()); } private void mockErrorResponse(Errors error, String exceptionCustomMsg) { @@ -637,7 +631,17 @@ private void mockErrorResponse(Errors error, String exceptionCustomMsg) { ClientResponse response = createHeartbeatResponse( result.unsentRequests.get(0), error, exceptionCustomMsg); result.unsentRequests.get(0).handler().onComplete(response); - ConsumerGroupHeartbeatResponse mockResponse = (ConsumerGroupHeartbeatResponse) response.responseBody(); + } + + private void mockResponseWithException(UnsupportedVersionException exception) { + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + ClientResponse response = createHeartbeatResponseWithException( + result.unsentRequests.get(0), exception); + result.unsentRequests.get(0).handler().onComplete(response); } private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { @@ -1038,6 +1042,23 @@ private ClientResponse createHeartbeatResponse( response); } + private ClientResponse createHeartbeatResponseWithException( + final NetworkClientDelegate.UnsentRequest request, + final UnsupportedVersionException exception + ) { + ConsumerGroupHeartbeatResponse response = new ConsumerGroupHeartbeatResponse(null); + return new ClientResponse( + new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1), + request.handler(), + "0", + time.milliseconds(), + time.milliseconds(), + false, + exception, + null, + response); + } + private ConsumerConfig config() { Properties prop = new Properties(); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);