From 101e15bb1c5ae9b329364eedee741e2c3bd8896c Mon Sep 17 00:00:00 2001 From: Nick Guo Date: Tue, 4 Mar 2025 14:56:25 +0800 Subject: [PATCH 1/2] KAFKA-18867 add tests to describe topic configs with empty name (#19075) Reviewers: TengYao Chi , Chia-Ping Tsai --- .../api/PlaintextAdminIntegrationTest.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 461e15aa59034..b57f4cd722c3d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1624,17 +1624,19 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { client = createAdminClient val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic) - client.describeConfigs(Collections.singletonList(existingTopic)).values.get(existingTopic).get() + client.describeConfigs(util.List.of(existingTopic)).values.get(existingTopic).get() - val nonExistentTopic = new ConfigResource(ConfigResource.Type.TOPIC, "unknown") - val describeResult1 = client.describeConfigs(Collections.singletonList(nonExistentTopic)) + val defaultTopic = new ConfigResource(ConfigResource.Type.TOPIC, "") + var describeResult = client.describeConfigs(util.List.of(defaultTopic)) + assertFutureThrows(classOf[InvalidTopicException], describeResult.all()) - assertTrue(assertThrows(classOf[ExecutionException], () => describeResult1.values.get(nonExistentTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]) + val nonExistentTopic = new ConfigResource(ConfigResource.Type.TOPIC, "unknown") + describeResult = client.describeConfigs(util.List.of(nonExistentTopic)) + assertFutureThrows(classOf[UnknownTopicOrPartitionException], describeResult.all()) val invalidTopic = new ConfigResource(ConfigResource.Type.TOPIC, "(invalid topic)") - val describeResult2 = client.describeConfigs(Collections.singletonList(invalidTopic)) - - assertTrue(assertThrows(classOf[ExecutionException], () => describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException]) + describeResult = client.describeConfigs(util.List.of(invalidTopic)) + assertFutureThrows(classOf[InvalidTopicException], describeResult.all()) } @Test From 1df4a42b40963f167fdd3e0a27c6b578dcd39c7b Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 4 Mar 2025 15:31:08 +0100 Subject: [PATCH 2/2] KAFKA-18916; Resolved regular expressions must update the group by topics data structure (#19088) When regular expressions are resolved, they do not update the group by topics data structure. Hence, topic changes (e.g. deletion) do not trigger a rebalance of the group. Reviewers: Lucas Brutschy --- .../ConsumerGroupHeartbeatRequestTest.scala | 32 +++++++++++++++++-- .../group/GroupMetadataManager.java | 22 ++++++++----- .../group/GroupMetadataManagerTest.java | 15 +++++++++ 3 files changed, 58 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 6b42c4f566a94..e94bcbc56a3fb 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} -import org.apache.kafka.common.Uuid +import org.apache.kafka.common.{TopicCollection, Uuid} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData} import org.apache.kafka.common.protocol.Errors @@ -174,7 +174,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { ) // Heartbeat request to join the group. Note that the member subscribes - // to an nonexistent topic. + // to a nonexistent topic. var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( new ConsumerGroupHeartbeatRequestData() .setGroupId("grp") @@ -214,7 +214,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { ).build() // This is the expected assignment. - val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + var expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() .setTopicId(topicId) .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) @@ -230,6 +230,32 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // Verify the response. assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) + + // Delete the topic. + admin.deleteTopics(TopicCollection.ofTopicIds(List(topicId).asJava)).all.get + + // Prepare the next heartbeat. + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(consumerGroupHeartbeatResponse.data.memberId) + .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch) + ).build() + + // This is the expected assignment. + expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + + // Heartbeats until the partitions are revoked. + consumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && + consumerGroupHeartbeatResponse.data.assignment == expectedAssignment + }, msg = s"Could not get partitions revoked. Last response $consumerGroupHeartbeatResponse.") + + // Verify the response. + assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) } finally { admin.close() } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index f71b47690d072..d3047053a58ca 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -4308,9 +4308,18 @@ public void replay( String groupId = key.groupId(); String regex = key.regularExpression(); + ConsumerGroup consumerGroup; + try { + consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null); + } catch (GroupIdNotFoundException ex) { + // If the group does not exist and a tombstone is replayed, we can ignore it. + return; + } + + Set oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames().keySet()); + if (value != null) { - ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, true); - group.updateResolvedRegularExpression( + consumerGroup.updateResolvedRegularExpression( regex, new ResolvedRegularExpression( new HashSet<>(value.topics()), @@ -4319,13 +4328,10 @@ public void replay( ) ); } else { - try { - ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, false); - group.removeResolvedRegularExpression(regex); - } catch (GroupIdNotFoundException ex) { - // If the group does not exist, we can ignore the tombstone. - } + consumerGroup.removeResolvedRegularExpression(regex); } + + updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames().keySet()); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 4933e50506769..f83bd1c267be4 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -16076,6 +16076,11 @@ public void testReplayConsumerGroupRegularExpression() { Optional.of(resolvedRegularExpression), context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*") ); + + assertEquals( + Set.of("foo"), + context.groupMetadataManager.groupsSubscribedToTopic("abc") + ); } @Test @@ -16101,6 +16106,11 @@ public void testReplayConsumerGroupRegularExpressionTombstone() { resolvedRegularExpression )); + assertEquals( + Set.of("foo"), + context.groupMetadataManager.groupsSubscribedToTopic("abc") + ); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone( "foo", "abc*" @@ -16110,6 +16120,11 @@ public void testReplayConsumerGroupRegularExpressionTombstone() { Optional.empty(), context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*") ); + + assertEquals( + Set.of(), + context.groupMetadataManager.groupsSubscribedToTopic("abc") + ); } @Test