Skip to content

Commit

Permalink
KAFKA-16375: Fix for rejoin while reconciling (apache#15579)
Browse files Browse the repository at this point in the history
This PR includes a fix to properly identify a reconciliation that should be interrupted and not applied because the member has rejoined. It does so simply based on a flag (not epochs, server or local). If the member has rejoined while reconciling, the reconciliation will be interrupted.

This also ensures that the check to abort the reconciliation is performed on all the 3 stages of the reconciliation that could be delayed: commit, onPartitionsRevoked, onPartitionsAssigned.

Reviewers: David Jacot <[email protected]>, Lucas Brutschy <[email protected]>
  • Loading branch information
lianetm authored Mar 25, 2024
1 parent e9e007a commit 0104cd0
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,11 @@ public class MembershipManagerImpl implements MembershipManager {
private boolean reconciliationInProgress;

/**
* Epoch the member had when the reconciliation in progress started. This is used to identify if
* the member has rejoined while it was reconciling an assignment (in which case the result
* of the reconciliation is not applied.)
* True if a reconciliation is in progress and the member rejoined the group since the start
* of the reconciliation. Used to know that the reconciliation in progress should be
* interrupted and not be applied.
*/
private int memberEpochOnReconciliationStart;
private boolean rejoinedWhileReconciliationInProgress;

/**
* If the member is currently leaving the group after a call to {@link #leaveGroup()}}, this
Expand Down Expand Up @@ -641,6 +641,9 @@ public void transitionToJoining() {
"the member is in FATAL state");
return;
}
if (reconciliationInProgress) {
rejoinedWhileReconciliationInProgress = true;
}
resetEpoch();
transitionTo(MemberState.JOINING);
clearPendingAssignmentsAndLocalNamesCache();
Expand Down Expand Up @@ -972,7 +975,10 @@ void maybeReconcile() {
log.debug("Auto-commit before reconciling new assignment completed successfully.");
}

revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions);
if (!maybeAbortReconciliation()) {
revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions);
}

}).exceptionally(error -> {
if (error != null) {
log.error("Reconciliation failed.", error);
Expand Down Expand Up @@ -1010,49 +1016,54 @@ private void revokeAndAssign(LocalAssignment resolvedAssignment,
// and assignment, executed sequentially).
CompletableFuture<Void> reconciliationResult =
revocationResult.thenCompose(__ -> {
boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch;
if (state == MemberState.RECONCILING && !memberHasRejoined) {
if (!maybeAbortReconciliation()) {
// Apply assignment
return assignPartitions(assignedTopicIdPartitions, addedPartitions);
} else {
log.debug("Revocation callback completed but the member already " +
"transitioned out of the reconciling state for epoch {} into " +
"{} state with epoch {}. Interrupting reconciliation as it's " +
"not relevant anymore,", memberEpochOnReconciliationStart, state, memberEpoch);
String reason = interruptedReconciliationErrorMessage();
CompletableFuture<Void> res = new CompletableFuture<>();
res.completeExceptionally(new KafkaException("Interrupting reconciliation" +
" after revocation. " + reason));
return res;
}
return CompletableFuture.completedFuture(null);
});

reconciliationResult.whenComplete((result, error) -> {
markReconciliationCompleted();
reconciliationResult.whenComplete((__, error) -> {
if (error != null) {
// Leaving member in RECONCILING state after callbacks fail. The member
// won't send the ack, and the expectation is that the broker will kick the
// member out of the group after the rebalance timeout expires, leading to a
// RECONCILING -> FENCED transition.
log.error("Reconciliation failed.", error);
markReconciliationCompleted();
} else {
if (state == MemberState.RECONCILING) {
if (reconciliationInProgress && !maybeAbortReconciliation()) {
currentAssignment = resolvedAssignment;

// Reschedule the auto commit starting from now that the member has a new assignment.
commitRequestManager.resetAutoCommitTimer();

// Make assignment effective on the broker by transitioning to send acknowledge.
transitionTo(MemberState.ACKNOWLEDGING);
} else {
String reason = interruptedReconciliationErrorMessage();
log.error("Interrupting reconciliation after partitions assigned callback " +
"completed. " + reason);
markReconciliationCompleted();
}
}
});
}

/**
* @return True if the reconciliation in progress should not continue. This could be because
* the member is not in RECONCILING state anymore (member failed or is leaving the group), or
* if it has rejoined the group (note that after rejoining the member could be RECONCILING
* again, so checking the state is not enough)
*/
boolean maybeAbortReconciliation() {
boolean shouldAbort = state != MemberState.RECONCILING || rejoinedWhileReconciliationInProgress;
if (shouldAbort) {
String reason = rejoinedWhileReconciliationInProgress ?
"the member has re-joined the group" :
"the member already transitioned out of the reconciling state into " + state;
log.info("Interrupting reconciliation that is not relevant anymore because " + reason);
markReconciliationCompleted();
}
return shouldAbort;
}

// Visible for testing.
void updateAssignment(Map<Uuid, SortedSet<Integer>> partitions) {
currentAssignment = new LocalAssignment(0, partitions);
Expand All @@ -1067,32 +1078,20 @@ private SortedSet<TopicPartition> toTopicPartitionSet(SortedSet<TopicIdPartition
return result;
}

/**
* @return Reason for interrupting a reconciliation progress when callbacks complete.
*/
private String interruptedReconciliationErrorMessage() {
String reason;
if (state != MemberState.RECONCILING) {
reason = "The member already transitioned out of the reconciling state into " + state;
} else {
reason = "The member has re-joined the group.";
}
return reason;
}

/**
* Visible for testing.
*/
void markReconciliationInProgress() {
reconciliationInProgress = true;
memberEpochOnReconciliationStart = memberEpoch;
rejoinedWhileReconciliationInProgress = false;
}

/**
* Visible for testing.
*/
void markReconciliationCompleted() {
reconciliationInProgress = false;
rejoinedWhileReconciliationInProgress = false;
}

/**
Expand Down
Loading

0 comments on commit 0104cd0

Please sign in to comment.