Skip to content

Commit

Permalink
Merge branch 'trunk' into KAFKA-18876-v4-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Mar 4, 2025
2 parents 7c3acb8 + 1df4a42 commit 07fcede
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1624,17 +1624,19 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
client = createAdminClient

val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic)
client.describeConfigs(Collections.singletonList(existingTopic)).values.get(existingTopic).get()
client.describeConfigs(util.List.of(existingTopic)).values.get(existingTopic).get()

val nonExistentTopic = new ConfigResource(ConfigResource.Type.TOPIC, "unknown")
val describeResult1 = client.describeConfigs(Collections.singletonList(nonExistentTopic))
val defaultTopic = new ConfigResource(ConfigResource.Type.TOPIC, "")
var describeResult = client.describeConfigs(util.List.of(defaultTopic))
assertFutureThrows(classOf[InvalidTopicException], describeResult.all())

assertTrue(assertThrows(classOf[ExecutionException], () => describeResult1.values.get(nonExistentTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException])
val nonExistentTopic = new ConfigResource(ConfigResource.Type.TOPIC, "unknown")
describeResult = client.describeConfigs(util.List.of(nonExistentTopic))
assertFutureThrows(classOf[UnknownTopicOrPartitionException], describeResult.all())

val invalidTopic = new ConfigResource(ConfigResource.Type.TOPIC, "(invalid topic)")
val describeResult2 = client.describeConfigs(Collections.singletonList(invalidTopic))

assertTrue(assertThrows(classOf[ExecutionException], () => describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException])
describeResult = client.describeConfigs(util.List.of(invalidTopic))
assertFutureThrows(classOf[InvalidTopicException], describeResult.all())
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature,
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.{TopicCollection, Uuid}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.Errors
Expand Down Expand Up @@ -174,7 +174,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
)

// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
// to a nonexistent topic.
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
Expand Down Expand Up @@ -214,7 +214,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
).build()

// This is the expected assignment.
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
var expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
Expand All @@ -230,6 +230,32 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)

// Delete the topic.
admin.deleteTopics(TopicCollection.ofTopicIds(List(topicId).asJava)).all.get

// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
).build()

// This is the expected assignment.
expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()

// Heartbeats until the partitions are revoked.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions revoked. Last response $consumerGroupHeartbeatResponse.")

// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
} finally {
admin.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4308,9 +4308,18 @@ public void replay(
String groupId = key.groupId();
String regex = key.regularExpression();

ConsumerGroup consumerGroup;
try {
consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
} catch (GroupIdNotFoundException ex) {
// If the group does not exist and a tombstone is replayed, we can ignore it.
return;
}

Set<String> oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames().keySet());

if (value != null) {
ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, true);
group.updateResolvedRegularExpression(
consumerGroup.updateResolvedRegularExpression(
regex,
new ResolvedRegularExpression(
new HashSet<>(value.topics()),
Expand All @@ -4319,13 +4328,10 @@ public void replay(
)
);
} else {
try {
ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, false);
group.removeResolvedRegularExpression(regex);
} catch (GroupIdNotFoundException ex) {
// If the group does not exist, we can ignore the tombstone.
}
consumerGroup.removeResolvedRegularExpression(regex);
}

updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames().keySet());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16076,6 +16076,11 @@ public void testReplayConsumerGroupRegularExpression() {
Optional.of(resolvedRegularExpression),
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
);

assertEquals(
Set.of("foo"),
context.groupMetadataManager.groupsSubscribedToTopic("abc")
);
}

@Test
Expand All @@ -16101,6 +16106,11 @@ public void testReplayConsumerGroupRegularExpressionTombstone() {
resolvedRegularExpression
));

assertEquals(
Set.of("foo"),
context.groupMetadataManager.groupsSubscribedToTopic("abc")
);

context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(
"foo",
"abc*"
Expand All @@ -16110,6 +16120,11 @@ public void testReplayConsumerGroupRegularExpressionTombstone() {
Optional.empty(),
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
);

assertEquals(
Set.of(),
context.groupMetadataManager.groupsSubscribedToTopic("abc")
);
}

@Test
Expand Down

0 comments on commit 07fcede

Please sign in to comment.