diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 85c46506eb871..8a599536841cc 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -569,7 +569,8 @@ class BrokerServer( config.groupMaxSessionTimeoutMs, config.offsetsRetentionCheckIntervalMs, config.offsetsRetentionMinutes * 60 * 1000L, - config.offsetCommitTimeoutMs + config.offsetCommitTimeoutMs, + config.consumerGroupMigrationPolicy ) val timer = new SystemTimerReaper( "group-coordinator-reaper", diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4f5a569f3fa78..624762b241b83 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -35,6 +35,7 @@ import org.apache.kafka.common.record.{CompressionType, LegacyRecord, Records, T import org.apache.kafka.common.security.auth.KafkaPrincipalSerde import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Utils +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.assignor.PartitionAssignor import org.apache.kafka.raft.RaftConfig @@ -247,9 +248,10 @@ object KafkaConfig { val ConsumerGroupMaxSessionTimeoutMsProp = "group.consumer.max.session.timeout.ms" val ConsumerGroupHeartbeatIntervalMsProp = "group.consumer.heartbeat.interval.ms" val ConsumerGroupMinHeartbeatIntervalMsProp = "group.consumer.min.heartbeat.interval.ms" - val ConsumerGroupMaxHeartbeatIntervalMsProp ="group.consumer.max.heartbeat.interval.ms" + val ConsumerGroupMaxHeartbeatIntervalMsProp = "group.consumer.max.heartbeat.interval.ms" val ConsumerGroupMaxSizeProp = "group.consumer.max.size" val ConsumerGroupAssignorsProp = "group.consumer.assignors" + val ConsumerGroupMigrationPolicyProp = "group.consumer.migration.policy" /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes" @@ -611,6 +613,12 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval for registered consumers." val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single consumer group can accommodate." val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor." + val ConsumerGroupMigrationPolicyDoc = "The config that enables converting the non-empty classic group using the consumer embedded protocol to the non-empty consumer group using the consumer group protocol and vice versa; " + + "conversions of empty groups in both directions are always enabled regardless of this policy. " + + ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled, " + + ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade from classic group to consumer group is enabled, " + + ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " + + ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled."; /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit." @@ -915,6 +923,7 @@ object KafkaConfig { .define(ConsumerGroupMaxHeartbeatIntervalMsProp, INT, Defaults.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, ConsumerGroupMaxHeartbeatIntervalMsDoc) .define(ConsumerGroupMaxSizeProp, INT, Defaults.CONSUMER_GROUP_MAX_SIZE, atLeast(1), MEDIUM, ConsumerGroupMaxSizeDoc) .define(ConsumerGroupAssignorsProp, LIST, Defaults.CONSUMER_GROUP_ASSIGNORS, null, MEDIUM, ConsumerGroupAssignorsDoc) + .defineInternal(ConsumerGroupMigrationPolicyProp, STRING, Defaults.CONSUMER_GROUP_MIGRATION_POLICY, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(classOf[ConsumerGroupMigrationPolicy]): _*), MEDIUM, ConsumerGroupMigrationPolicyDoc) /** ********* Offset management configuration ***********/ .define(OffsetMetadataMaxSizeProp, INT, Defaults.OFFSET_METADATA_MAX_SIZE, HIGH, OffsetMetadataMaxSizeDoc) @@ -1569,6 +1578,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val consumerGroupMaxHeartbeatIntervalMs = getInt(KafkaConfig.ConsumerGroupMaxHeartbeatIntervalMsProp) val consumerGroupMaxSize = getInt(KafkaConfig.ConsumerGroupMaxSizeProp) val consumerGroupAssignors = getConfiguredInstances(KafkaConfig.ConsumerGroupAssignorsProp, classOf[PartitionAssignor]) + val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(KafkaConfig.ConsumerGroupMigrationPolicyProp)) /** ********* Offset management configuration ***********/ val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 745afcf0ed341..e383cb6176af9 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -35,6 +35,7 @@ import java.net.InetSocketAddress import java.util import java.util.{Collections, Properties} import org.apache.kafka.common.Node +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1} @@ -1831,6 +1832,29 @@ class KafkaConfigTest { assertTrue(config.isNewGroupCoordinatorEnabled) } + @Test + def testConsumerGroupMigrationPolicy(): Unit = { + val props = new Properties() + props.putAll(kraftProps()) + + // Invalid GroupProtocolMigrationPolicy name. + props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, "foo") + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + + ConsumerGroupMigrationPolicy.values.foreach { policy => + props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString) + val config = KafkaConfig.fromProps(props) + assertEquals(policy, config.consumerGroupMigrationPolicy) + } + + // The config is case-insensitive. + ConsumerGroupMigrationPolicy.values.foreach { policy => + props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString.toUpperCase()) + val config = KafkaConfig.fromProps(props) + assertEquals(policy, config.consumerGroupMigrationPolicy) + } + } + @Test def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java new file mode 100644 index 0000000000000..c78ded138474a --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupMigrationPolicy { + /** Both upgrade and downgrade are enabled.*/ + BIDIRECTIONAL("bidirectional"), + + /** Only upgrade is enabled.*/ + UPGRADE("upgrade"), + + /** Only downgrade is enabled.*/ + DOWNGRADE("downgrade"), + + /** Neither upgrade nor downgrade is enabled.*/ + DISABLED("disabled"); + + private final String name; + + ConsumerGroupMigrationPolicy(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + + private final static Map NAME_TO_ENUM = Arrays.stream(values()) + .collect(Collectors.toMap(policy -> policy.name.toLowerCase(Locale.ROOT), Function.identity())); + + /** + * Parse a string into the corresponding {@code GroupProtocolMigrationPolicy} enum value, in a case-insensitive manner. + * + * @return The {{@link ConsumerGroupMigrationPolicy}} according to the string passed. None is returned if + * the string doesn't correspond to a valid policy. + */ + public static ConsumerGroupMigrationPolicy parse(String name) { + if (name == null) { + return DISABLED; + } + ConsumerGroupMigrationPolicy policy = NAME_TO_ENUM.get(name.toLowerCase(Locale.ROOT)); + + return policy == null ? DISABLED : policy; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 10fd9697e4d11..866624d48aa25 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -118,6 +118,11 @@ public class GroupCoordinatorConfig { */ public final int offsetCommitTimeoutMs; + /** + * The config indicating whether group protocol upgrade/downgrade are allowed. + */ + public final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy; + public GroupCoordinatorConfig( int numThreads, int consumerGroupSessionTimeoutMs, @@ -133,7 +138,8 @@ public GroupCoordinatorConfig( int classicGroupMaxSessionTimeoutMs, long offsetsRetentionCheckIntervalMs, long offsetsRetentionMs, - int offsetCommitTimeoutMs + int offsetCommitTimeoutMs, + ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy ) { this.numThreads = numThreads; this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs; @@ -150,5 +156,6 @@ public GroupCoordinatorConfig( this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs; this.offsetsRetentionMs = offsetsRetentionMs; this.offsetCommitTimeoutMs = offsetCommitTimeoutMs; + this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy; } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index be4a9bf7d0ade..f63b1e80ecfc1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -189,6 +189,7 @@ public GroupCoordinatorShard build() { .withClassicGroupNewMemberJoinTimeoutMs(config.classicGroupNewMemberJoinTimeoutMs) .withClassicGroupMinSessionTimeoutMs(config.classicGroupMinSessionTimeoutMs) .withClassicGroupMaxSessionTimeoutMs(config.classicGroupMaxSessionTimeoutMs) + .withGroupProtocolMigrationPolicy(config.consumerGroupMigrationPolicy) .withGroupCoordinatorMetricsShard(metricsShard) .build(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index d12ed6a9dead2..441c7b658dec2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -160,6 +160,7 @@ public static class Builder { private int classicGroupNewMemberJoinTimeoutMs = 5 * 60 * 1000; private int classicGroupMinSessionTimeoutMs; private int classicGroupMaxSessionTimeoutMs; + private ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy; private GroupCoordinatorMetricsShard metrics; Builder withLogContext(LogContext logContext) { @@ -237,6 +238,11 @@ Builder withClassicGroupMaxSessionTimeoutMs(int classicGroupMaxSessionTimeoutMs) return this; } + Builder withGroupProtocolMigrationPolicy(ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy) { + this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy; + return this; + } + Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) { this.metrics = metrics; return this; @@ -271,7 +277,8 @@ GroupMetadataManager build() { classicGroupInitialRebalanceDelayMs, classicGroupNewMemberJoinTimeoutMs, classicGroupMinSessionTimeoutMs, - classicGroupMaxSessionTimeoutMs + classicGroupMaxSessionTimeoutMs, + consumerGroupMigrationPolicy ); } } @@ -385,6 +392,11 @@ GroupMetadataManager build() { */ private final int classicGroupMaxSessionTimeoutMs; + /** + * The config indicating whether group protocol upgrade/downgrade is allowed. + */ + private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy; + private GroupMetadataManager( SnapshotRegistry snapshotRegistry, LogContext logContext, @@ -401,7 +413,8 @@ private GroupMetadataManager( int classicGroupInitialRebalanceDelayMs, int classicGroupNewMemberJoinTimeoutMs, int classicGroupMinSessionTimeoutMs, - int classicGroupMaxSessionTimeoutMs + int classicGroupMaxSessionTimeoutMs, + ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy ) { this.logContext = logContext; this.log = logContext.logger(GroupMetadataManager.class); @@ -423,6 +436,7 @@ private GroupMetadataManager( this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs; this.classicGroupMinSessionTimeoutMs = classicGroupMinSessionTimeoutMs; this.classicGroupMaxSessionTimeoutMs = classicGroupMaxSessionTimeoutMs; + this.consumerGroupMigrationPolicy = consumerGroupMigrationPolicy; } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 10ce9f50b9684..243ffa5292087 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -43,7 +43,8 @@ public void testConfigs() { 10 * 60 * 1000, 600000L, 24 * 60 * 60 * 1000L, - 5000 + 5000, + ConsumerGroupMigrationPolicy.DISABLED ); assertEquals(10, config.numThreads); @@ -83,7 +84,8 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig( 10 * 5 * 1000, offsetsRetentionCheckIntervalMs, offsetsRetentionMs, - 5000 + 5000, + ConsumerGroupMigrationPolicy.DISABLED ); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index ee2defb5ae647..3c27cfd5466a2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -130,7 +130,8 @@ private GroupCoordinatorConfig createConfig() { 10 * 5 * 1000, 600000L, 24 * 60 * 1000L, - 5000 + 5000, + ConsumerGroupMigrationPolicy.DISABLED ); } diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java b/server/src/main/java/org/apache/kafka/server/config/Defaults.java index 642d166459194..45ee9fc47e8ec 100644 --- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java +++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ListenerName; @@ -144,6 +145,7 @@ public class Defaults { UniformAssignor.class.getName(), RangeAssignor.class.getName() ); + public static final String CONSUMER_GROUP_MIGRATION_POLICY = ConsumerGroupMigrationPolicy.DISABLED.toString(); /** ********* Offset management configuration *********/ public static final int OFFSET_METADATA_MAX_SIZE = OffsetConfig.DEFAULT_MAX_METADATA_SIZE;