From 410065a65df2716e563429577dc2d3bcaa0c7c9a Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Wed, 22 Jan 2025 18:02:38 +0100 Subject: [PATCH] KAFKA-18517: Enable ConsumerBounceTest to run for new async consumer (#18532) Reviewers: Andrew Schofield , Kirk True --- .../kafka/api/ConsumerBounceTest.scala | 45 +++++++++++++------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index a8dbe0ecdaa5d..3b257e3f7a50a 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs} import org.apache.kafka.server.util.ShutdownableThread import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, Disabled, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, Disabled, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -59,7 +59,12 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> "1", GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG -> "10", // set small enough session timeout GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG -> "0", + + // Tests will run for CONSUMER and CLASSIC group protocol, so set the group max size property + // required for each. + GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString, GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString, + ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> "false", ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG -> "true", ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG -> "50", @@ -94,7 +99,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testConsumptionWithBrokerFailures(quorum: String, groupProtocol: String): Unit = consumeWithBrokerFailures(10) /* @@ -139,7 +144,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testSeekAndCommitWithBrokerFailures(quorum: String, groupProtocol: String): Unit = seekAndCommitWithBrokerFailures(5) def seekAndCommitWithBrokerFailures(numIters: Int): Unit = { @@ -183,7 +188,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testSubscribeWhenTopicUnavailable(quorum: String, groupProtocol: String): Unit = { val numRecords = 1000 val newtopic = "newtopic" @@ -243,7 +248,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { checkCloseGoodPath(numRecords, "group1") checkCloseWithCoordinatorFailure(numRecords, "group2", "group3") - checkCloseWithClusterFailure(numRecords, "group4", "group5") + checkCloseWithClusterFailure(numRecords, "group4", "group5", groupProtocol) } /** @@ -297,12 +302,15 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { * there is no coordinator, but close should timeout and return. If close is invoked with a very * large timeout, close should timeout after request timeout. */ - private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String): Unit = { + private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String, + groupProtocol: String): Unit = { val consumer1 = createConsumerAndReceive(group1, manualAssign = false, numRecords) val requestTimeout = 6000 - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000") - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000") + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + } this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString) val consumer2 = createConsumerAndReceive(group2, manualAssign = true, numRecords) @@ -319,9 +327,10 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { * the group should be forced to rebalance when it becomes hosted on a Coordinator with the new config. * Then, 1 consumer should be left out of the group. */ - @Test + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) @Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13421) - def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = { + def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(quorum: String, groupProtocol: String): Unit = { val group = "group-max-size-test" val topic = "group-max-size-test" val maxGroupSize = 2 @@ -329,7 +338,9 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { val partitionCount = consumerCount * 2 this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + } this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount) @@ -361,12 +372,14 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { * When we have the consumer group max size configured to X, the X+1th consumer trying to join should receive a fatal exception */ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(quorum: String, groupProtocol: String): Unit = { val group = "fatal-exception-test" val topic = "fatal-exception-test" this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + } this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount) @@ -401,11 +414,15 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { */ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + // TODO: enable for all protocols after fix for not generating/blocking on unneeded + // FindCoordinator on close for the new consumer def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = { val topic = "closetest" createTopic(topic, 10, brokerCount) this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + } this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") checkCloseDuringRebalance("group1", topic, executor, brokersAvailableDuringClose = true) }