diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index 1f8ddc725b58d..eb7611f7b8a73 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -161,6 +161,7 @@ public abstract class AbstractHeartbeatRequestManager backgroundEventHandler.add(new ErrorEvent(fatalError))); + } + private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); heartbeatRequestState.onSendAttempt(currentTimeMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 4f0deef5bf890..1d3503886a9c4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -176,9 +176,11 @@ public CommitRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { - // poll only when the coordinator node is known. - if (coordinatorRequestManager.coordinator().isEmpty()) + // poll when the coordinator node is known and fatal error is not present + if (coordinatorRequestManager.coordinator().isEmpty()) { + pendingRequests.maybeFailOnCoordinatorFatalError(); return EMPTY; + } if (closing) { return drainPendingOffsetCommitRequests(); @@ -1246,6 +1248,16 @@ private List drainPendingCommits() { clearAll(); return res; } + + private void maybeFailOnCoordinatorFatalError() { + coordinatorRequestManager.fatalError().ifPresent(error -> { + log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error); + unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error)); + unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error)); + clearAll(); + } + ); + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java index 0f1650d0e674b..dd53ae11790f8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; @@ -53,7 +51,6 @@ public class CoordinatorRequestManager implements RequestManager { private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000; private final Logger log; - private final BackgroundEventHandler backgroundEventHandler; private final String groupId; private final RequestState coordinatorRequestState; @@ -61,17 +58,21 @@ public class CoordinatorRequestManager implements RequestManager { private long totalDisconnectedMin = 0; private boolean closing = false; private Node coordinator; + // Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take + // appropriate actions. + // For example: + // - AbstractHeartbeatRequestManager propagates the error event to the application thread. + // - CommitRequestManager fail pending requests. + private Optional fatalError = Optional.empty(); public CoordinatorRequestManager( final LogContext logContext, final long retryBackoffMs, final long retryBackoffMaxMs, - final BackgroundEventHandler errorHandler, final String groupId ) { Objects.requireNonNull(groupId); this.log = logContext.logger(this.getClass()); - this.backgroundEventHandler = errorHandler; this.groupId = groupId; this.coordinatorRequestState = new RequestState( logContext, @@ -120,6 +121,7 @@ NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long curren ); return unsentRequest.whenComplete((clientResponse, throwable) -> { + getAndClearFatalError(); if (clientResponse != null) { FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody(); onResponse(clientResponse.receivedTimeMs(), response); @@ -206,12 +208,12 @@ private void onFailedResponse(final long currentTimeMs, final Throwable exceptio if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId); - backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException)); + fatalError = Optional.of(groupAuthorizationException); return; } log.warn("FindCoordinator request failed due to fatal exception", exception); - backgroundEventHandler.add(new ErrorEvent(exception)); + fatalError = Optional.of(exception); } /** @@ -250,4 +252,14 @@ private void onResponse( public Optional coordinator() { return Optional.ofNullable(this.coordinator); } + + public Optional getAndClearFatalError() { + Optional fatalError = this.fatalError; + this.fatalError = Optional.empty(); + return fatalError; + } + + public Optional fatalError() { + return fatalError; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 01b62c89ac86c..68be535a18ae9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -194,7 +194,6 @@ protected RequestManagers create() { logContext, retryBackoffMs, retryBackoffMaxMs, - backgroundEventHandler, groupRebalanceConfig.groupId); commitRequestManager = new CommitRequestManager( time, @@ -295,7 +294,6 @@ protected RequestManagers create() { logContext, retryBackoffMs, retryBackoffMaxMs, - backgroundEventHandler, groupRebalanceConfig.groupId); ShareMembershipManager shareMembershipManager = new ShareMembershipManager( logContext, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 33ca2844305c7..2749563df2742 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2239,7 +2239,7 @@ public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws In // by the background thread, so it can realize there is authentication fail and then // throw the AuthenticationException assertPollEventuallyThrows(consumer, AuthenticationException.class, - "he consumer was not able to discover metadata errors during continuous polling."); + "this consumer was not able to discover metadata errors during continuous polling."); } else { assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 252d5a7ccbd08..1b7ed8599238d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -1499,6 +1499,23 @@ public void testSignalClose() { assertEquals("topic", data.topics().get(0).name()); } + @Test + public void testPollWithFatalErrorShouldFailAllUnsentRequests() { + CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + + commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200); + assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size()); + + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + when(coordinatorRequestManager.fatalError()) + .thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception"))); + + assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200)); + + assertEmptyPendingRequests(commitRequestManager); + } + private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) { assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty()); assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java index 2165cb814ed1e..0ed902d7f278d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java @@ -18,9 +18,7 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.Node; -import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -35,6 +33,8 @@ import org.apache.logging.log4j.Level; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; import java.util.List; @@ -49,9 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; public class CoordinatorRequestManagerTest { @@ -191,23 +189,10 @@ public void testBackoffAfterRetriableFailure() { } @Test - public void testPropagateAndBackoffAfterFatalError() { + public void testBackoffAfterFatalError() { CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); - verify(backgroundEventHandler).add(argThat(backgroundEvent -> { - if (!(backgroundEvent instanceof ErrorEvent)) - return false; - - RuntimeException exception = ((ErrorEvent) backgroundEvent).error(); - - if (!(exception instanceof GroupAuthorizationException)) - return false; - - GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception; - return groupAuthException.groupId().equals(GROUP_ID); - })); - time.sleep(RETRY_BACKOFF_MS - 1); assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); @@ -254,19 +239,20 @@ public void testNetworkTimeout() { assertEquals(1, res2.unsentRequests.size()); } - @Test - public void testSignalOnClose() { + @ParameterizedTest + @EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"}) + public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) { CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); - expectFindCoordinatorRequest(coordinatorManager, Errors.NONE); - assertTrue(coordinatorManager.coordinator().isPresent()); - coordinatorManager.markCoordinatorUnknown("coordinator changed", time.milliseconds()); - assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); - coordinatorManager.signalClose(); - time.sleep(RETRY_BACKOFF_MS - 1); - assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); + expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); + assertTrue(coordinatorManager.fatalError().isPresent()); + time.sleep(RETRY_BACKOFF_MS); - assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests, - "Should not generate find coordinator request during close"); + // there are no successful responses, so the fatal error should persist + assertTrue(coordinatorManager.fatalError().isPresent()); + + // receiving a successful response should clear the fatal error + expectFindCoordinatorRequest(coordinatorManager, error); + assertTrue(coordinatorManager.fatalError().isEmpty()); } private void expectFindCoordinatorRequest( @@ -288,7 +274,6 @@ private CoordinatorRequestManager setupCoordinatorManager(String groupId) { new LogContext(), RETRY_BACKOFF_MS, RETRY_BACKOFF_MS, - this.backgroundEventHandler, groupId ); } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4256d40da4ad2..f8e3e1fc54afc 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1294,7 +1294,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = { val consumer = createConsumer() assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) @@ -1331,7 +1331,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) val consumer = createConsumer() @@ -1357,7 +1357,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { createTopicWithBrokerPrincipal(topic) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 3b400c573f93c..4beb3943bea89 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -435,8 +435,4 @@ object QuorumTestHarness { Arguments.of("kraft", GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) ) } - - // The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer - // implementation that would otherwise cause tests to fail. - def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly }