Skip to content

Commit

Permalink
KAFKA-14516: [3/3] Integration Test - Static Member Removed After Ses…
Browse files Browse the repository at this point in the history
…sion Timeout (apache#14911)

This new integration test verifies that a static member who temporary left the group is removed after the session timeout expires. It also verifies that a new static member with the same instance id can't join the group until the previous static member is expired.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
vamossagar12 authored Dec 8, 2023
1 parent 0ad059d commit e6e7d8c
Showing 1 changed file with 108 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,114 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
}

@ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "5000"),
new ClusterConfigProperty(key = "group.consumer.min.session.timeout.ms", value = "5000")
))
def testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
val instanceId = "instanceId"

// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
controllers = raftCluster.controllerServers().asScala.toSeq
)

// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava)
).build()

// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")

// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)

// Create the topic.
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)

// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
).build()

// This is the expected assignment.
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)

// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")

// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)

// A new static member tries to join the group with an inuse instanceid.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava)
).build()

// Validating that trying to join with an in-use instanceId would throw an UnreleasedInstanceIdException.
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
assertEquals(Errors.UNRELEASED_INSTANCE_ID.code, consumerGroupHeartbeatResponse.data.errorCode)

// The new static member join group will keep failing with an UnreleasedInstanceIdException
// until eventually it gets through because the existing member will be kicked out
// because of not sending a heartbeat till session timeout expiry.
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not re-join the group successfully. Last response $consumerGroupHeartbeatResponse.")

// Verify the response. The group epoch bumps upto 4 which eventually reflects in the new member epoch.
assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
}

private def connectAndReceive(request: ConsumerGroupHeartbeatRequest): ConsumerGroupHeartbeatResponse = {
IntegrationTestUtils.connectAndReceive[ConsumerGroupHeartbeatResponse](
request,
Expand Down

0 comments on commit e6e7d8c

Please sign in to comment.