Skip to content

Commit

Permalink
KAFKA-16294: Add group protocol migration enabling config (apache#15411)
Browse files Browse the repository at this point in the history
This patch adds the `group.consumer.migration.policy` config which controls how consumer groups can be converted from classic group to consumer group and vice versa. The config is kept as an internal one while we develop the feature.

Reviewers: Jeff Kim <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
dongnuo123 authored Apr 10, 2024
1 parent dc9fbe4 commit 619f270
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 8 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ class BrokerServer(
config.groupMaxSessionTimeoutMs,
config.offsetsRetentionCheckIntervalMs,
config.offsetsRetentionMinutes * 60 * 1000L,
config.offsetCommitTimeoutMs
config.offsetCommitTimeoutMs,
config.consumerGroupMigrationPolicy
)
val timer = new SystemTimerReaper(
"group-coordinator-reaper",
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.kafka.common.record.{CompressionType, LegacyRecord, Records, T
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
import org.apache.kafka.raft.RaftConfig
Expand Down Expand Up @@ -247,9 +248,10 @@ object KafkaConfig {
val ConsumerGroupMaxSessionTimeoutMsProp = "group.consumer.max.session.timeout.ms"
val ConsumerGroupHeartbeatIntervalMsProp = "group.consumer.heartbeat.interval.ms"
val ConsumerGroupMinHeartbeatIntervalMsProp = "group.consumer.min.heartbeat.interval.ms"
val ConsumerGroupMaxHeartbeatIntervalMsProp ="group.consumer.max.heartbeat.interval.ms"
val ConsumerGroupMaxHeartbeatIntervalMsProp = "group.consumer.max.heartbeat.interval.ms"
val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
val ConsumerGroupAssignorsProp = "group.consumer.assignors"
val ConsumerGroupMigrationPolicyProp = "group.consumer.migration.policy"

/** ********* Offset management configuration ***********/
val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes"
Expand Down Expand Up @@ -611,6 +613,12 @@ object KafkaConfig {
val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval for registered consumers."
val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single consumer group can accommodate."
val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor."
val ConsumerGroupMigrationPolicyDoc = "The config that enables converting the non-empty classic group using the consumer embedded protocol to the non-empty consumer group using the consumer group protocol and vice versa; " +
"conversions of empty groups in both directions are always enabled regardless of this policy. " +
ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled, " +
ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade from classic group to consumer group is enabled, " +
ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " +
ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled.";

/** ********* Offset management configuration ***********/
val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit."
Expand Down Expand Up @@ -915,6 +923,7 @@ object KafkaConfig {
.define(ConsumerGroupMaxHeartbeatIntervalMsProp, INT, Defaults.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, ConsumerGroupMaxHeartbeatIntervalMsDoc)
.define(ConsumerGroupMaxSizeProp, INT, Defaults.CONSUMER_GROUP_MAX_SIZE, atLeast(1), MEDIUM, ConsumerGroupMaxSizeDoc)
.define(ConsumerGroupAssignorsProp, LIST, Defaults.CONSUMER_GROUP_ASSIGNORS, null, MEDIUM, ConsumerGroupAssignorsDoc)
.defineInternal(ConsumerGroupMigrationPolicyProp, STRING, Defaults.CONSUMER_GROUP_MIGRATION_POLICY, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(classOf[ConsumerGroupMigrationPolicy]): _*), MEDIUM, ConsumerGroupMigrationPolicyDoc)

/** ********* Offset management configuration ***********/
.define(OffsetMetadataMaxSizeProp, INT, Defaults.OFFSET_METADATA_MAX_SIZE, HIGH, OffsetMetadataMaxSizeDoc)
Expand Down Expand Up @@ -1569,6 +1578,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val consumerGroupMaxHeartbeatIntervalMs = getInt(KafkaConfig.ConsumerGroupMaxHeartbeatIntervalMsProp)
val consumerGroupMaxSize = getInt(KafkaConfig.ConsumerGroupMaxSizeProp)
val consumerGroupAssignors = getConfiguredInstances(KafkaConfig.ConsumerGroupAssignorsProp, classOf[PartitionAssignor])
val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(KafkaConfig.ConsumerGroupMigrationPolicyProp))

/** ********* Offset management configuration ***********/
val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp)
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import java.net.InetSocketAddress
import java.util
import java.util.{Collections, Properties}
import org.apache.kafka.common.Node
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
Expand Down Expand Up @@ -1831,6 +1832,29 @@ class KafkaConfigTest {
assertTrue(config.isNewGroupCoordinatorEnabled)
}

@Test
def testConsumerGroupMigrationPolicy(): Unit = {
val props = new Properties()
props.putAll(kraftProps())

// Invalid GroupProtocolMigrationPolicy name.
props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, "foo")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))

ConsumerGroupMigrationPolicy.values.foreach { policy =>
props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString)
val config = KafkaConfig.fromProps(props)
assertEquals(policy, config.consumerGroupMigrationPolicy)
}

// The config is case-insensitive.
ConsumerGroupMigrationPolicy.values.foreach { policy =>
props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString.toUpperCase())
val config = KafkaConfig.fromProps(props)
assertEquals(policy, config.consumerGroupMigrationPolicy)
}
}

@Test
def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.coordinator.group;

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public enum ConsumerGroupMigrationPolicy {
/** Both upgrade and downgrade are enabled.*/
BIDIRECTIONAL("bidirectional"),

/** Only upgrade is enabled.*/
UPGRADE("upgrade"),

/** Only downgrade is enabled.*/
DOWNGRADE("downgrade"),

/** Neither upgrade nor downgrade is enabled.*/
DISABLED("disabled");

private final String name;

ConsumerGroupMigrationPolicy(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}

private final static Map<String, ConsumerGroupMigrationPolicy> NAME_TO_ENUM = Arrays.stream(values())
.collect(Collectors.toMap(policy -> policy.name.toLowerCase(Locale.ROOT), Function.identity()));

/**
* Parse a string into the corresponding {@code GroupProtocolMigrationPolicy} enum value, in a case-insensitive manner.
*
* @return The {{@link ConsumerGroupMigrationPolicy}} according to the string passed. None is returned if
* the string doesn't correspond to a valid policy.
*/
public static ConsumerGroupMigrationPolicy parse(String name) {
if (name == null) {
return DISABLED;
}
ConsumerGroupMigrationPolicy policy = NAME_TO_ENUM.get(name.toLowerCase(Locale.ROOT));

return policy == null ? DISABLED : policy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public class GroupCoordinatorConfig {
*/
public final int offsetCommitTimeoutMs;

/**
* The config indicating whether group protocol upgrade/downgrade are allowed.
*/
public final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;

public GroupCoordinatorConfig(
int numThreads,
int consumerGroupSessionTimeoutMs,
Expand All @@ -133,7 +138,8 @@ public GroupCoordinatorConfig(
int classicGroupMaxSessionTimeoutMs,
long offsetsRetentionCheckIntervalMs,
long offsetsRetentionMs,
int offsetCommitTimeoutMs
int offsetCommitTimeoutMs,
ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy
) {
this.numThreads = numThreads;
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
Expand All @@ -150,5 +156,6 @@ public GroupCoordinatorConfig(
this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs;
this.offsetsRetentionMs = offsetsRetentionMs;
this.offsetCommitTimeoutMs = offsetCommitTimeoutMs;
this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public GroupCoordinatorShard build() {
.withClassicGroupNewMemberJoinTimeoutMs(config.classicGroupNewMemberJoinTimeoutMs)
.withClassicGroupMinSessionTimeoutMs(config.classicGroupMinSessionTimeoutMs)
.withClassicGroupMaxSessionTimeoutMs(config.classicGroupMaxSessionTimeoutMs)
.withGroupProtocolMigrationPolicy(config.consumerGroupMigrationPolicy)
.withGroupCoordinatorMetricsShard(metricsShard)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public static class Builder {
private int classicGroupNewMemberJoinTimeoutMs = 5 * 60 * 1000;
private int classicGroupMinSessionTimeoutMs;
private int classicGroupMaxSessionTimeoutMs;
private ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;
private GroupCoordinatorMetricsShard metrics;

Builder withLogContext(LogContext logContext) {
Expand Down Expand Up @@ -237,6 +238,11 @@ Builder withClassicGroupMaxSessionTimeoutMs(int classicGroupMaxSessionTimeoutMs)
return this;
}

Builder withGroupProtocolMigrationPolicy(ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy) {
this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy;
return this;
}

Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) {
this.metrics = metrics;
return this;
Expand Down Expand Up @@ -271,7 +277,8 @@ GroupMetadataManager build() {
classicGroupInitialRebalanceDelayMs,
classicGroupNewMemberJoinTimeoutMs,
classicGroupMinSessionTimeoutMs,
classicGroupMaxSessionTimeoutMs
classicGroupMaxSessionTimeoutMs,
consumerGroupMigrationPolicy
);
}
}
Expand Down Expand Up @@ -385,6 +392,11 @@ GroupMetadataManager build() {
*/
private final int classicGroupMaxSessionTimeoutMs;

/**
* The config indicating whether group protocol upgrade/downgrade is allowed.
*/
private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;

private GroupMetadataManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext,
Expand All @@ -401,7 +413,8 @@ private GroupMetadataManager(
int classicGroupInitialRebalanceDelayMs,
int classicGroupNewMemberJoinTimeoutMs,
int classicGroupMinSessionTimeoutMs,
int classicGroupMaxSessionTimeoutMs
int classicGroupMaxSessionTimeoutMs,
ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy
) {
this.logContext = logContext;
this.log = logContext.logger(GroupMetadataManager.class);
Expand All @@ -423,6 +436,7 @@ private GroupMetadataManager(
this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs;
this.classicGroupMinSessionTimeoutMs = classicGroupMinSessionTimeoutMs;
this.classicGroupMaxSessionTimeoutMs = classicGroupMaxSessionTimeoutMs;
this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void testConfigs() {
10 * 60 * 1000,
600000L,
24 * 60 * 60 * 1000L,
5000
5000,
ConsumerGroupMigrationPolicy.DISABLED
);

assertEquals(10, config.numThreads);
Expand Down Expand Up @@ -83,7 +84,8 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig(
10 * 5 * 1000,
offsetsRetentionCheckIntervalMs,
offsetsRetentionMs,
5000
5000,
ConsumerGroupMigrationPolicy.DISABLED
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ private GroupCoordinatorConfig createConfig() {
10 * 5 * 1000,
600000L,
24 * 60 * 1000L,
5000
5000,
ConsumerGroupMigrationPolicy.DISABLED
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ListenerName;
Expand Down Expand Up @@ -144,6 +145,7 @@ public class Defaults {
UniformAssignor.class.getName(),
RangeAssignor.class.getName()
);
public static final String CONSUMER_GROUP_MIGRATION_POLICY = ConsumerGroupMigrationPolicy.DISABLED.toString();

/** ********* Offset management configuration *********/
public static final int OFFSET_METADATA_MAX_SIZE = OffsetConfig.DEFAULT_MAX_METADATA_SIZE;
Expand Down

0 comments on commit 619f270

Please sign in to comment.