Skip to content

Commit

Permalink
KAFKA-15853 Move consumer group and group coordinator configs out of …
Browse files Browse the repository at this point in the history
…core (apache#15684)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
OmniaGM authored Apr 17, 2024
1 parent a9f65a5 commit 363f4d2
Show file tree
Hide file tree
Showing 43 changed files with 399 additions and 381 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2033,6 +2033,7 @@ project(':tools') {
testImplementation project(':connect:runtime')
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.main.output
testImplementation project(':group-coordinator')
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
Expand Down Expand Up @@ -2190,6 +2191,7 @@ project(':streams') {
testImplementation project(':tools')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage')
testImplementation project(':group-coordinator')
testImplementation project(':transaction-coordinator')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
Expand All @@ -2202,6 +2204,7 @@ project(':streams') {
testImplementation libs.hamcrest
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
testImplementation project(':group-coordinator')

testRuntimeOnly project(':streams:test-utils')
testRuntimeOnly libs.slf4jlog4j
Expand Down Expand Up @@ -2338,6 +2341,7 @@ project(':streams:streams-scala') {
// So we make sure to not include it in the dependencies.
api libs.scalaCollectionCompat
}
testImplementation project(':group-coordinator')
testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
Expand Down Expand Up @@ -3006,8 +3010,10 @@ project(':connect:runtime') {
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
testImplementation project(':server')
testImplementation project(':group-coordinator')
testImplementation project(':storage')
testImplementation project(':connect:test-plugins')
testImplementation project(':group-coordinator')

testImplementation libs.easymock
testImplementation libs.junitJupiterApi
Expand Down
5 changes: 4 additions & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@

<subpackage name="consumer">
<allow pkg="org.apache.kafka.tools"/>

<subpackage name="group">
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="kafka.api"/>
<allow pkg="kafka.security"/>
<allow pkg="kafka.zk" />
Expand Down Expand Up @@ -415,6 +415,7 @@
<allow pkg="org.apache.kafka.server.config" />
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
<allow class="org.apache.kafka.coordinator.transaction.TransactionLogConfigs" />
<allow pkg="org.apache.kafka.coordinator.group" />
</subpackage>

<subpackage name="test">
Expand Down Expand Up @@ -485,6 +486,8 @@
<allow pkg="org.apache.kafka.connect.components"/>
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.test"/>
<!-- for testing -->
<allow pkg="org.apache.kafka.coordinator.group" />

<subpackage name="source">
<allow pkg="org.apache.kafka.connect.connector" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
Expand Down Expand Up @@ -160,8 +161,8 @@ private void doStart() {
brokerConfig.put(ZkConfigs.ZK_CONNECT_CONFIG, zKConnectString());

putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
putIfAbsent(brokerConfig, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0);
putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) brokers.length);
putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false);
// reduce the size of the log cleaner map to reduce test memory usage
putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
Expand Down
211 changes: 66 additions & 145 deletions core/src/main/scala/kafka/server/KafkaConfig.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.kafka.common.resource.{Resource, ResourcePattern}
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, TRANSACTIONAL_ID}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.junit.jupiter.api.{BeforeEach, TestInfo}
Expand Down Expand Up @@ -110,8 +111,8 @@ class AbstractAuthorizerIntegrationTest extends BaseRequestTest {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
}

properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.api
import java.time.Duration
import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.record.TimestampType
Expand All @@ -33,6 +32,7 @@ import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, Buffer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.util.ShutdownableThread

import scala.collection.mutable
Expand Down Expand Up @@ -67,11 +67,11 @@ abstract class AbstractConsumerTest extends BaseRequestTest {

override protected def brokerPropertyOverrides(properties: Properties): Unit = {
properties.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
properties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
properties.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
properties.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, groupMaxSessionTimeoutMs.toString)
properties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10")
properties.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "3") // don't want to lose offset
properties.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.setProperty(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "100") // set small enough session timeout
properties.setProperty(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, groupMaxSessionTimeoutMs.toString)
properties.setProperty(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "10")
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.errors.{TopicExistsException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.KafkaSecurityConfigs
import org.apache.kafka.server.config.ReplicationConfigs
Expand Down Expand Up @@ -204,7 +205,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
}
configs.foreach { config =>
config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
config.setProperty(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0")
config.setProperty(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, "false")
config.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
// We set this in order to test that we don't expose sensitive data via describe configs. This will already be
Expand Down
11 changes: 6 additions & 5 deletions core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.quota.ClientQuotaAlteration
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.config.ClientQuotaManagerConfig
import org.junit.jupiter.api.Assertions._
Expand All @@ -51,11 +52,11 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
protected def createQuotaTestClients(topic: String, leaderNode: KafkaBroker): QuotaTestClients

this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false")
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100")
this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "60000")
this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
this.serverConfig.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "2")
this.serverConfig.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
this.serverConfig.setProperty(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "100")
this.serverConfig.setProperty(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "60000")
this.serverConfig.setProperty(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0")
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "-1")
this.producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "300000")
this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.GroupMaxSizeReachedException
import org.apache.kafka.common.message.FindCoordinatorRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -54,11 +55,11 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {

private def generateKafkaConfigs(maxGroupSize: String = maxGroupSize.toString): Seq[KafkaConfig] = {
val properties = new Properties
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
properties.put(KafkaConfig.GroupMaxSizeProp, maxGroupSize)
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "3") // don't want to lose offset
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "10") // set small enough session timeout
properties.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0")
properties.put(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, maxGroupSize)
properties.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.kafka.common.resource._
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth._
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.{KafkaSecurityConfigs, ZkConfigs}
import org.apache.kafka.server.config.ReplicationConfigs
Expand Down Expand Up @@ -134,8 +135,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW))

// Some needed configuration for brokers, producers, and consumers
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3")
this.serverConfig.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
this.serverConfig.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "3")
this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3")
this.serverConfig.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "3")
this.serverConfig.setProperty(KafkaSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG, "1500")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -87,8 +88,8 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
}

properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
properties.put(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import org.junit.jupiter.params.provider.ValueSource

import scala.jdk.CollectionConverters._
import java.util.Properties

import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig

class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
val offsetsTopicCompressionCodec = CompressionType.GZIP
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.id.toString)
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, offsetsTopicCompressionCodec.id.toString)

override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnectOrNull, enableControlledShutdown = false).map {
KafkaConfig.fromProps(_, overridingProps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import kafka.integration.KafkaServerTestHarness
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.ReplicationConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}

Expand Down Expand Up @@ -69,7 +70,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
cfgs.foreach(_.setProperty(KafkaConfig.MigrationEnabledProp, "true"))
}
if (isNewGroupCoordinatorEnabled()) {
cfgs.foreach(_.setProperty(KafkaConfig.NewGroupCoordinatorEnableProp, "true"))
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"))
}

if(isKRaftTest()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
Expand All @@ -38,7 +39,7 @@ class LogAppendTimeTest extends IntegrationTestHarness {

// This will be used for the offsets topic as well
serverConfig.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.name)
serverConfig.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
serverConfig.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "2")

private val topic = "topic"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{Defaults, KafkaSecurityConfigs, ZkConfigs}
import org.apache.kafka.server.config.{KafkaSecurityConfigs, ZkConfigs}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
Expand Down Expand Up @@ -1353,7 +1354,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
// Increase timeouts to avoid having a rebalance during the test
newConsumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE.toString)
newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Defaults.GROUP_MAX_SESSION_TIMEOUT_MS.toString)
newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT.toString)
val consumer = createConsumer(configOverrides = newConsumerConfig)

try {
Expand Down
Loading

0 comments on commit 363f4d2

Please sign in to comment.