Skip to content

Commit

Permalink
KAFKA-18034: CommitRequestManager should fail pending requests on fat…
Browse files Browse the repository at this point in the history
…al coordinator errors (#18548)

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Kirk True <ktrue@confluent.io>
  • Loading branch information
m1a2st authored Jan 30, 2025
1 parent be96807 commit 4b29fd6
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) {
membershipManager().onHeartbeatRequestSkipped();
maybePropagateCoordinatorFatalErrorEvent();
return NetworkClientDelegate.PollResult.EMPTY;
}
pollTimer.update(currentTimeMs);
Expand Down Expand Up @@ -263,6 +264,11 @@ public void resetPollTimer(final long pollMs) {
pollTimer.reset(maxPollIntervalMs);
}

private void maybePropagateCoordinatorFatalErrorEvent() {
coordinatorRequestManager.getAndClearFatalError()
.ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError)));
}

private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) {
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse);
heartbeatRequestState.onSendAttempt(currentTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,11 @@ public CommitRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
// poll only when the coordinator node is known.
if (coordinatorRequestManager.coordinator().isEmpty())
// poll when the coordinator node is known and fatal error is not present
if (coordinatorRequestManager.coordinator().isEmpty()) {
pendingRequests.maybeFailOnCoordinatorFatalError();
return EMPTY;
}

if (closing) {
return drainPendingOffsetCommitRequests();
Expand Down Expand Up @@ -1246,6 +1248,16 @@ private List<NetworkClientDelegate.UnsentRequest> drainPendingCommits() {
clearAll();
return res;
}

private void maybeFailOnCoordinatorFatalError() {
coordinatorRequestManager.fatalError().ifPresent(error -> {
log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error);
unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error));
unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error));
clearAll();
}
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
Expand Down Expand Up @@ -53,25 +51,28 @@
public class CoordinatorRequestManager implements RequestManager {
private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000;
private final Logger log;
private final BackgroundEventHandler backgroundEventHandler;
private final String groupId;

private final RequestState coordinatorRequestState;
private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while
private long totalDisconnectedMin = 0;
private boolean closing = false;
private Node coordinator;
// Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take
// appropriate actions.
// For example:
// - AbstractHeartbeatRequestManager propagates the error event to the application thread.
// - CommitRequestManager fail pending requests.
private Optional<Throwable> fatalError = Optional.empty();

public CoordinatorRequestManager(
final LogContext logContext,
final long retryBackoffMs,
final long retryBackoffMaxMs,
final BackgroundEventHandler errorHandler,
final String groupId
) {
Objects.requireNonNull(groupId);
this.log = logContext.logger(this.getClass());
this.backgroundEventHandler = errorHandler;
this.groupId = groupId;
this.coordinatorRequestState = new RequestState(
logContext,
Expand Down Expand Up @@ -120,6 +121,7 @@ NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long curren
);

return unsentRequest.whenComplete((clientResponse, throwable) -> {
getAndClearFatalError();
if (clientResponse != null) {
FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody();
onResponse(clientResponse.receivedTimeMs(), response);
Expand Down Expand Up @@ -206,12 +208,12 @@ private void onFailedResponse(final long currentTimeMs, final Throwable exceptio
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage());
KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId);
backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException));
fatalError = Optional.of(groupAuthorizationException);
return;
}

log.warn("FindCoordinator request failed due to fatal exception", exception);
backgroundEventHandler.add(new ErrorEvent(exception));
fatalError = Optional.of(exception);
}

/**
Expand Down Expand Up @@ -250,4 +252,14 @@ private void onResponse(
public Optional<Node> coordinator() {
return Optional.ofNullable(this.coordinator);
}

public Optional<Throwable> getAndClearFatalError() {
Optional<Throwable> fatalError = this.fatalError;
this.fatalError = Optional.empty();
return fatalError;
}

public Optional<Throwable> fatalError() {
return fatalError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ protected RequestManagers create() {
logContext,
retryBackoffMs,
retryBackoffMaxMs,
backgroundEventHandler,
groupRebalanceConfig.groupId);
commitRequestManager = new CommitRequestManager(
time,
Expand Down Expand Up @@ -295,7 +294,6 @@ protected RequestManagers create() {
logContext,
retryBackoffMs,
retryBackoffMaxMs,
backgroundEventHandler,
groupRebalanceConfig.groupId);
ShareMembershipManager shareMembershipManager = new ShareMembershipManager(
logContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2239,7 +2239,7 @@ public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws In
// by the background thread, so it can realize there is authentication fail and then
// throw the AuthenticationException
assertPollEventuallyThrows(consumer, AuthenticationException.class,
"he consumer was not able to discover metadata errors during continuous polling.");
"this consumer was not able to discover metadata errors during continuous polling.");
} else {
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,23 @@ public void testSignalClose() {
assertEquals("topic", data.topics().get(0).name());
}

@Test
public void testPollWithFatalErrorShouldFailAllUnsentRequests() {
CommitRequestManager commitRequestManager = create(true, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));

commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200);
assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size());

when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
when(coordinatorRequestManager.fatalError())
.thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception")));

assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200));

assertEmptyPendingRequests(commitRequestManager);
}

private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) {
assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());
assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
Expand All @@ -35,6 +33,8 @@
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.util.Collections;
import java.util.List;
Expand All @@ -49,9 +49,7 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

public class CoordinatorRequestManagerTest {
Expand Down Expand Up @@ -191,23 +189,10 @@ public void testBackoffAfterRetriableFailure() {
}

@Test
public void testPropagateAndBackoffAfterFatalError() {
public void testBackoffAfterFatalError() {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);

verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
if (!(backgroundEvent instanceof ErrorEvent))
return false;

RuntimeException exception = ((ErrorEvent) backgroundEvent).error();

if (!(exception instanceof GroupAuthorizationException))
return false;

GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception;
return groupAuthException.groupId().equals(GROUP_ID);
}));

time.sleep(RETRY_BACKOFF_MS - 1);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);

Expand Down Expand Up @@ -254,19 +239,20 @@ public void testNetworkTimeout() {
assertEquals(1, res2.unsentRequests.size());
}

@Test
public void testSignalOnClose() {
@ParameterizedTest
@EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"})
public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager, Errors.NONE);
assertTrue(coordinatorManager.coordinator().isPresent());
coordinatorManager.markCoordinatorUnknown("coordinator changed", time.milliseconds());
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);
coordinatorManager.signalClose();
time.sleep(RETRY_BACKOFF_MS - 1);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);
assertTrue(coordinatorManager.fatalError().isPresent());

time.sleep(RETRY_BACKOFF_MS);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests,
"Should not generate find coordinator request during close");
// there are no successful responses, so the fatal error should persist
assertTrue(coordinatorManager.fatalError().isPresent());

// receiving a successful response should clear the fatal error
expectFindCoordinatorRequest(coordinatorManager, error);
assertTrue(coordinatorManager.fatalError().isEmpty());
}

private void expectFindCoordinatorRequest(
Expand All @@ -288,7 +274,6 @@ private CoordinatorRequestManager setupCoordinatorManager(String groupId) {
new LogContext(),
RETRY_BACKOFF_MS,
RETRY_BACKOFF_MS,
this.backgroundEventHandler,
groupId
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer()
assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava))
Expand Down Expand Up @@ -1331,7 +1331,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
val consumer = createConsumer()
Expand All @@ -1357,7 +1357,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,4 @@ object QuorumTestHarness {
Arguments.of("kraft", GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT))
)
}

// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
// implementation that would otherwise cause tests to fail.
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
}

0 comments on commit 4b29fd6

Please # to comment.