Skip to content

Commit

Permalink
KAFKA-16249; Improve reconciliation state machine (apache#15364)
Browse files Browse the repository at this point in the history
This patch re-work the reconciliation state machine on the server side with the goal to fix a few issues that we have recently discovered.
* When a member acknowledges the revocation of partitions (by not reporting them in the heartbeat), the current implementation may miss it. The issue is that the current implementation re-compute the assignment of a member whenever there is a new target assignment installed. When it happens, it does not consider the reported owned partitions at all. As the member is supposed to only report its own partitions when they change, the member is stuck.
* Similarly, as the current assignment is re-computed whenever there is a new target assignment, the rebalance timeout, as it is currently implemented, becomes useless. The issue is that the rebalance timeout is reset whenever the member enters the revocation state. In other words, in the current implementation, the timer is reset when there are no target available even if the previous revocation is not completed yet.

The patch fixes these two issues by not automatically recomputing the assignment of a member when a new target assignment is available. When the member must revoke partitions, the coordinator waits. Otherwise, it recomputes the next assignment. In other words, revoking is really blocking now.

The patch also proposes to include an explicit state in the record. It makes the implementation cleaner and it also makes it more extensible in the future.

The patch also changes the record format. This is a non-backward compatible change. I think that we should do this change to cleanup the record. As KIP-848 is only in early access in 3.7 and that we clearly state that we don't plane to support upgrade from it, this is acceptable in my opinion.

Reviewers: Jeff Kim <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
dajac authored Mar 14, 2024
1 parent 612a1fe commit e164d4d
Show file tree
Hide file tree
Showing 17 changed files with 823 additions and 1,278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
import org.apache.kafka.coordinator.group.consumer.MemberState;
import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
Expand Down Expand Up @@ -98,6 +99,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -119,13 +121,15 @@
import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.Utils.assignmentToString;
import static org.apache.kafka.coordinator.group.Utils.ofSentinel;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPARING_REBALANCE;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
import static org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;

Expand Down Expand Up @@ -1058,7 +1062,6 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
}
}


int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();

Expand Down Expand Up @@ -1166,38 +1169,17 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
}
}

// 3. Reconcile the member's assignment with the target assignment. This is only required if
// the member is not stable or if a new target assignment has been installed.
boolean assignmentUpdated = false;
if (updatedMember.state() != ConsumerGroupMember.MemberState.STABLE || updatedMember.targetMemberEpoch() != targetAssignmentEpoch) {
ConsumerGroupMember prevMember = updatedMember;
updatedMember = new CurrentAssignmentBuilder(updatedMember)
.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
.withCurrentPartitionEpoch(group::currentPartitionEpoch)
.withOwnedTopicPartitions(ownedTopicPartitions)
.build();

// Checking the reference is enough here because a new instance
// is created only when the state has changed.
if (updatedMember != prevMember) {
assignmentUpdated = true;
records.add(newCurrentAssignmentRecord(groupId, updatedMember));

log.info("[GroupId {}] Member {} transitioned from {} to {}.",
groupId, memberId, member.currentAssignmentSummary(), updatedMember.currentAssignmentSummary());

if (updatedMember.state() == ConsumerGroupMember.MemberState.REVOKING) {
scheduleConsumerGroupRevocationTimeout(
groupId,
memberId,
updatedMember.rebalanceTimeoutMs(),
updatedMember.memberEpoch()
);
} else {
cancelConsumerGroupRevocationTimeout(groupId, memberId);
}
}
}
// 3. Reconcile the member's assignment with the target assignment if the member is not
// fully reconciled yet.
updatedMember = maybeReconcile(
groupId,
updatedMember,
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
ownedTopicPartitions,
records
);

scheduleConsumerGroupSessionTimeout(groupId, memberId);

Expand All @@ -1211,13 +1193,71 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
// 1. The member reported its owned partitions;
// 2. The member just joined or rejoined to group (epoch equals to zero);
// 3. The member's assignment has been updated.
if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) {
if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(createResponseAssignment(updatedMember));
}

return new CoordinatorResult<>(records, response);
}

/**
* Reconciles the current assignment of the member towards the target assignment if needed.
*
* @param groupId The group id.
* @param member The member to reconcile.
* @param currentPartitionEpoch The function returning the current epoch of
* a given partition.
* @param targetAssignmentEpoch The target assignment epoch.
* @param targetAssignment The target assignment.
* @param ownedTopicPartitions The list of partitions owned by the member. This
* is reported in the ConsumerGroupHeartbeat API and
* it could be null if not provided.
* @param records The list to accumulate any new records.
* @return The received member if no changes have been made; or a new
* member containing the new assignment.
*/
private ConsumerGroupMember maybeReconcile(
String groupId,
ConsumerGroupMember member,
BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
int targetAssignmentEpoch,
Assignment targetAssignment,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions,
List<Record> records
) {
if (member.isReconciledTo(targetAssignmentEpoch)) {
return member;
}

ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
.withCurrentPartitionEpoch(currentPartitionEpoch)
.withOwnedTopicPartitions(ownedTopicPartitions)
.build();

if (!updatedMember.equals(member)) {
records.add(newCurrentAssignmentRecord(groupId, updatedMember));

log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, "
+ "assignedPartitions={} and revokedPartitions={}.",
groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(),
assignmentToString(updatedMember.assignedPartitions()), assignmentToString(updatedMember.partitionsPendingRevocation()));

if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
scheduleConsumerGroupRebalanceTimeout(
groupId,
updatedMember.memberId(),
updatedMember.memberEpoch(),
updatedMember.rebalanceTimeoutMs()
);
} else {
cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId());
}
}

return updatedMember;
}

private void removeMemberAndCancelTimers(
List<Record> records,
String groupId,
Expand Down Expand Up @@ -1353,7 +1393,7 @@ private void removeMember(List<Record> records, String groupId, String memberId)
*/
private void cancelTimers(String groupId, String memberId) {
cancelConsumerGroupSessionTimeout(groupId, memberId);
cancelConsumerGroupRevocationTimeout(groupId, memberId);
cancelConsumerGroupRebalanceTimeout(groupId, memberId);
}

/**
Expand Down Expand Up @@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout(
}

/**
* Schedules a revocation timeout for the member.
* Schedules a rebalance timeout for the member.
*
* @param groupId The group id.
* @param memberId The member id.
* @param revocationTimeoutMs The revocation timeout.
* @param expectedMemberEpoch The expected member epoch.
* @param memberEpoch The member epoch.
* @param rebalanceTimeoutMs The rebalance timeout.
*/
private void scheduleConsumerGroupRevocationTimeout(
private void scheduleConsumerGroupRebalanceTimeout(
String groupId,
String memberId,
long revocationTimeoutMs,
int expectedMemberEpoch
int memberEpoch,
int rebalanceTimeoutMs
) {
String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
String key = consumerGroupRebalanceTimeoutKey(groupId, memberId);
timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
try {
ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);

if (member.state() != ConsumerGroupMember.MemberState.REVOKING ||
member.memberEpoch() != expectedMemberEpoch) {
log.debug("[GroupId {}] Ignoring revocation timeout for {} because the member " +
"state does not match the expected state.", groupId, memberId);
if (member.memberEpoch() == memberEpoch) {
log.info("[GroupId {}] Member {} fenced from the group because " +
"it failed to transition from epoch {} within {}ms.",
groupId, memberId, memberEpoch, rebalanceTimeoutMs);
return new CoordinatorResult<>(consumerGroupFenceMember(group, member));
} else {
log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member " +
"left the epoch {}.", groupId, memberId, memberEpoch);
return new CoordinatorResult<>(Collections.emptyList());
}

log.info("[GroupId {}] Member {} fenced from the group because " +
"it failed to revoke partitions within {}ms.", groupId, memberId, revocationTimeoutMs);
return new CoordinatorResult<>(consumerGroupFenceMember(group, member));
} catch (GroupIdNotFoundException ex) {
log.debug("[GroupId {}] Could not fence {}} because the group does not exist.",
groupId, memberId);
Expand All @@ -1442,16 +1482,16 @@ private void scheduleConsumerGroupRevocationTimeout(
}

/**
* Cancels the revocation timeout of the member.
* Cancels the rebalance timeout of the member.
*
* @param groupId The group id.
* @param memberId The member id.
*/
private void cancelConsumerGroupRevocationTimeout(
private void cancelConsumerGroupRebalanceTimeout(
String groupId,
String memberId
) {
timer.cancel(consumerGroupRevocationTimeoutKey(groupId, memberId));
timer.cancel(consumerGroupRebalanceTimeoutKey(groupId, memberId));
}

/**
Expand Down Expand Up @@ -1744,10 +1784,8 @@ public void replay(
ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(oldMember)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setTargetMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setAssignedPartitions(Collections.emptyMap())
.setPartitionsPendingRevocation(Collections.emptyMap())
.setPartitionsPendingAssignment(Collections.emptyMap())
.build();
consumerGroup.updateMember(newMember);
}
Expand Down Expand Up @@ -1796,12 +1834,12 @@ public void onLoaded() {
consumerGroup.members().forEach((memberId, member) -> {
log.debug("Loaded member {} in consumer group {}.", memberId, groupId);
scheduleConsumerGroupSessionTimeout(groupId, memberId);
if (member.state() == ConsumerGroupMember.MemberState.REVOKING) {
scheduleConsumerGroupRevocationTimeout(
if (member.state() == MemberState.UNREVOKED_PARTITIONS) {
scheduleConsumerGroupRebalanceTimeout(
groupId,
memberId,
member.rebalanceTimeoutMs(),
member.memberEpoch()
member.memberId(),
member.memberEpoch(),
member.rebalanceTimeoutMs()
);
}
});
Expand Down Expand Up @@ -1834,8 +1872,8 @@ public static String consumerGroupSessionTimeoutKey(String groupId, String membe
return "session-timeout-" + groupId + "-" + memberId;
}

public static String consumerGroupRevocationTimeoutKey(String groupId, String memberId) {
return "revocation-timeout-" + groupId + "-" + memberId;
public static String consumerGroupRebalanceTimeoutKey(String groupId, String memberId) {
return "rebalance-timeout-" + groupId + "-" + memberId;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,9 @@ public static Record newCurrentAssignmentRecord(
new ConsumerGroupCurrentMemberAssignmentValue()
.setMemberEpoch(member.memberEpoch())
.setPreviousMemberEpoch(member.previousMemberEpoch())
.setTargetMemberEpoch(member.targetMemberEpoch())
.setState(member.state().value())
.setAssignedPartitions(toTopicPartitions(member.assignedPartitions()))
.setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocation()))
.setPartitionsPendingAssignment(toTopicPartitions(member.partitionsPendingAssignment())),
.setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocation())),
(short) 0
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.Uuid;

import java.util.Iterator;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;

public class Utils {
private Utils() {}
Expand All @@ -37,4 +42,31 @@ public static OptionalInt ofSentinel(int value) {
public static OptionalLong ofSentinel(long value) {
return value != -1 ? OptionalLong.of(value) : OptionalLong.empty();
}

/**
* @return The provided assignment as a String.
*
* Example:
* [topicid1-0, topicid1-1, topicid2-0, topicid2-1]
*/
public static String assignmentToString(
Map<Uuid, Set<Integer>> assignment
) {
StringBuilder builder = new StringBuilder("[");
Iterator<Map.Entry<Uuid, Set<Integer>>> topicsIterator = assignment.entrySet().iterator();
while (topicsIterator.hasNext()) {
Map.Entry<Uuid, Set<Integer>> entry = topicsIterator.next();
Iterator<Integer> partitionsIterator = entry.getValue().iterator();
while (partitionsIterator.hasNext()) {
builder.append(entry.getKey());
builder.append("-");
builder.append(partitionsIterator.next());
if (partitionsIterator.hasNext() || topicsIterator.hasNext()) {
builder.append(", ");
}
}
}
builder.append("]");
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ private void maybeUpdateGroupState() {
newState = ASSIGNING;
} else {
for (ConsumerGroupMember member : members.values()) {
if (member.targetMemberEpoch() != targetAssignmentEpoch.get() || member.state() != ConsumerGroupMember.MemberState.STABLE) {
if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
newState = RECONCILING;
break;
}
Expand Down
Loading

0 comments on commit e164d4d

Please sign in to comment.