Skip to content

Commit

Permalink
KAFKA-18017: Fix call order for HB error and group manager (apache#17805
Browse files Browse the repository at this point in the history
)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
lianetm authored Nov 18, 2024
1 parent 7ef56a9 commit 5cf9872
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
*/
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
if (!coordinatorRequestManager.coordinator().isPresent() ||
membershipManager().shouldSkipHeartbeat()) {
if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) {
membershipManager().onHeartbeatRequestSkipped();
return NetworkClientDelegate.PollResult.EMPTY;
}
Expand Down Expand Up @@ -305,7 +304,6 @@ private NetworkClientDelegate.UnsentRequest logResponse(final NetworkClientDeleg
private void onFailure(final Throwable exception, final long responseTimeMs) {
this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
resetHeartbeatState();
membershipManager().onHeartbeatFailure(exception instanceof RetriableException);
if (exception instanceof RetriableException) {
coordinatorRequestManager.handleCoordinatorDisconnect(exception, responseTimeMs);
String message = String.format("%s failed because of the retriable exception. Will retry in %s ms: %s",
Expand All @@ -317,6 +315,8 @@ private void onFailure(final Throwable exception, final long responseTimeMs) {
logger.error("{} failed due to fatal error: {}", heartbeatRequestName(), exception.getMessage());
handleFatalFailure(exception);
}
// Notify the group manager about the failure after all errors have been handled and propagated.
membershipManager().onHeartbeatFailure(exception instanceof RetriableException);
}

private void onResponse(final R response, final long currentTimeMs) {
Expand All @@ -336,7 +336,6 @@ private void onErrorResponse(final R response, final long currentTimeMs) {

resetHeartbeatState();
this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
membershipManager().onHeartbeatFailure(false);

switch (error) {
case NOT_COORDINATOR:
Expand Down Expand Up @@ -415,6 +414,9 @@ private void onErrorResponse(final R response, final long currentTimeMs) {
}
break;
}

// Notify the group manager about the failure after all errors have been handled and propagated.
membershipManager().onHeartbeatFailure(false);
}

protected void logInfo(final String message, final R response, final long currentTimeMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
Expand All @@ -52,6 +53,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -74,6 +76,7 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -418,6 +421,36 @@ public void testFailureOnFatalException() {
verify(backgroundEventHandler).add(any());
}

@Test
public void testHeartbeatResponseErrorNotifiedToGroupManagerAfterErrorPropagated() {
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_AUTHORIZATION_FAILED);
result.unsentRequests.get(0).handler().onComplete(response);

// The error should be propagated before notifying the group manager. This ensures that the app thread is aware
// of the HB error before the manager completes any ongoing unsubscribe.
InOrder inOrder = inOrder(backgroundEventHandler, membershipManager);
inOrder.verify(backgroundEventHandler).add(any(ErrorEvent.class));
inOrder.verify(membershipManager).onHeartbeatFailure(false);
}

@Test
public void testHeartbeatRequestFailureNotifiedToGroupManagerAfterErrorPropagated() {
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_AUTHORIZATION_FAILED);
result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new AuthenticationException("Fatal error in HB"));

// The error should be propagated before notifying the group manager. This ensures that the app thread is aware
// of the HB error before the manager completes any ongoing unsubscribe.
InOrder inOrder = inOrder(backgroundEventHandler, membershipManager);
inOrder.verify(backgroundEventHandler).add(any(ErrorEvent.class));
inOrder.verify(membershipManager).onHeartbeatFailure(false);
}

@Test
public void testNoCoordinator() {
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
Expand Down

0 comments on commit 5cf9872

Please sign in to comment.