Skip to content

Commit

Permalink
KAFKA-17733: Protocol upgrade should allow empty member assignment in…
Browse files Browse the repository at this point in the history
… group conversion (apache#17853)

During conversion from classic to consumer group, if a member has empty assignment (e.g., the member just joined and has never synced), the conversion will fail because of the buffer underflow error when deserializing the member assignment. This patch allows empty assignment while deserializing the member assignment.

Reviewers: Jeff Kim <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
dongnuo123 authored Nov 19, 2024
1 parent b5158aa commit 8ccb26d
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.timeline.TimelineObject;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -56,6 +57,7 @@

import static org.apache.kafka.coordinator.group.Utils.toOptional;
import static org.apache.kafka.coordinator.group.Utils.toTopicPartitionMap;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT;
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.ASSIGNING;
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.RECONCILING;
Expand Down Expand Up @@ -939,12 +941,22 @@ public static ConsumerGroup fromClassicGroup(
consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());

classicGroup.allMembers().forEach(classicGroupMember -> {
Map<Uuid, Set<Integer>> assignedPartitions = toTopicPartitionMap(
ConsumerProtocol.deserializeConsumerProtocolAssignment(
ByteBuffer.wrap(classicGroupMember.assignment())
),
topicsImage
);
// The assigned partition can be empty if the member just joined and has never synced.
// We should accept the empty assignment.
Map<Uuid, Set<Integer>> assignedPartitions;
if (Arrays.equals(classicGroupMember.assignment(), EMPTY_ASSIGNMENT)) {
assignedPartitions = Collections.emptyMap();
} else {
assignedPartitions = toTopicPartitionMap(
ConsumerProtocol.deserializeConsumerProtocolAssignment(
ByteBuffer.wrap(classicGroupMember.assignment())
),
topicsImage
);
}

// Every member is guaranteed to have metadata set when it joins,
// so we don't check for empty subscription here.
ConsumerProtocolSubscription subscription = ConsumerProtocol.deserializeConsumerProtocolSubscription(
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10240,6 +10240,127 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
context.assertSessionTimeout(groupId, newMemberId, 45000);
}

@Test
public void testConsumerGroupHeartbeatToClassicGroupWithEmptyAssignmentMember() throws ExecutionException, InterruptedException {
String groupId = "group-id";
String memberId2 = "member-id-2";
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";

MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 1)
.addTopic(barTopicId, barTopicName, 1)
.addRacks()
.build();

GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.UPGRADE)
.withConsumerGroupAssignors(Collections.singletonList(new NoOpPartitionAssignor()))
.withMetadataImage(metadataImage)
.build();

JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName("range")
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
List.of(fooTopicName, barTopicName)
))))
);

// Member 1 joins, creating a new classic group.
GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(
new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId(groupId)
.withMemberId(UNKNOWN_MEMBER_ID)
.withProtocols(protocols)
.withSessionTimeoutMs(5000)
.withRebalanceTimeoutMs(10000)
.build()
);

// Triggering completion of the rebalance.
// Member 1 has never synced so its assignment is empty.
context.sleep(3000 + 1);
String memberId1 = joinResult.joinFuture.get().memberId();
ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
assertTrue(group.isInState(COMPLETING_REBALANCE));

// A new member 2 with new protocol joins the classic group, triggering the upgrade.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setRebalanceTimeoutMs(5000)
.setServerAssignor(NoOpPartitionAssignor.NAME)
.setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
.setTopicPartitions(Collections.emptyList()));

ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
.setMemberEpoch(1)
.setPreviousMemberEpoch(1)
.setState(MemberState.STABLE)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
.setRebalanceTimeoutMs(10000)
.setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSessionTimeoutMs(5000)
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols))
)
.setAssignedPartitions(Collections.emptyMap())
.build();

ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
.setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setState(MemberState.STABLE)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setServerAssignorName(NoOpPartitionAssignor.NAME)
.setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
.setRebalanceTimeoutMs(5000)
.setAssignedPartitions(Collections.emptyMap())
.build();

List<CoordinatorRecord> expectedRecords = List.of(
// The existing classic group tombstone.
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId),

// Create the new consumer group with member 1.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1),

// Member 2 joins the new consumer group.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),

// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),

// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Collections.emptyMap()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2),

// Member 2 has no pending revoking partition or pending release partition.
// Bump its member epoch and transition to STABLE.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)
);

assertRecordsEquals(expectedRecords, result.records());

context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get());
context.assertSessionTimeout(groupId, memberId2, 45000);
}

@Test
public void testConsumerGroupHeartbeatFromExistingClassicStaticMember() {
String groupId = "group-id";
Expand Down

0 comments on commit 8ccb26d

Please sign in to comment.