diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index 972af0414e463..fd1c70e509fff 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -46,7 +46,7 @@ object ApiVersionManager { def apply( listenerType: ListenerType, config: KafkaConfig, - forwardingManager: Option[ForwardingManager], + forwardingManager: ForwardingManager, supportedFeatures: BrokerFeatures, metadataCache: MetadataCache, clientMetricsManager: Option[ClientMetricsManager] @@ -129,7 +129,7 @@ class SimpleApiVersionManager( */ class DefaultApiVersionManager( val listenerType: ListenerType, - forwardingManager: Option[ForwardingManager], + forwardingManager: ForwardingManager, brokerFeatures: BrokerFeatures, metadataCache: MetadataCache, val enableUnstableLastVersion: Boolean, @@ -143,7 +143,7 @@ class DefaultApiVersionManager( alterFeatureLevel0: Boolean ): ApiVersionsResponse = { val finalizedFeatures = metadataCache.features() - val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions) + val controllerApiVersions = forwardingManager.controllerApiVersions val clientTelemetryEnabled = clientMetricsManager match { case Some(manager) => manager.isTelemetryReceiverConfigured case None => false diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 442e6a403d107..ace134773ae2b 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -253,7 +253,7 @@ class BrokerServer( val apiVersionManager = ApiVersionManager( ListenerType.BROKER, config, - Some(forwardingManager), + forwardingManager, brokerFeatures, metadataCache, Some(clientMetricsManager) diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala index fcfd8a05ae649..341c859bf32da 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala @@ -21,7 +21,7 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.server.BrokerFeatures import org.apache.kafka.server.common.KRaftVersion -import org.junit.jupiter.api.{Disabled, Test} +import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource @@ -36,9 +36,10 @@ class ApiVersionManagerTest { @ParameterizedTest @EnumSource(classOf[ListenerType]) def testApiScope(apiScope: ListenerType): Unit = { + val forwardingManager = Mockito.mock(classOf[ForwardingManager]) val versionManager = new DefaultApiVersionManager( listenerType = apiScope, - forwardingManager = None, + forwardingManager = forwardingManager, brokerFeatures = brokerFeatures, metadataCache = metadataCache, enableUnstableLastVersion = true @@ -54,9 +55,10 @@ class ApiVersionManagerTest { @ParameterizedTest @EnumSource(classOf[ListenerType]) def testDisabledApis(apiScope: ListenerType): Unit = { + val forwardingManager = Mockito.mock(classOf[ForwardingManager]) val versionManager = new DefaultApiVersionManager( listenerType = apiScope, - forwardingManager = None, + forwardingManager = forwardingManager, brokerFeatures = brokerFeatures, metadataCache = metadataCache, enableUnstableLastVersion = false @@ -85,7 +87,7 @@ class ApiVersionManagerTest { val versionManager = new DefaultApiVersionManager( listenerType = ListenerType.ZK_BROKER, - forwardingManager = Some(forwardingManager), + forwardingManager = forwardingManager, brokerFeatures = brokerFeatures, metadataCache = metadataCache, enableUnstableLastVersion = true @@ -103,59 +105,18 @@ class ApiVersionManagerTest { val forwardingManager = Mockito.mock(classOf[ForwardingManager]) Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None) - for (forwardingManagerOpt <- Seq(Some(forwardingManager), None)) { - val versionManager = new DefaultApiVersionManager( - listenerType = ListenerType.BROKER, - forwardingManager = forwardingManagerOpt, - brokerFeatures = brokerFeatures, - metadataCache = metadataCache, - enableUnstableLastVersion = true - ) - assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion)) - assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) - - val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false) - val envelopeVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id) - assertNull(envelopeVersion) - } - } - - @Disabled("Enable after enable KIP-590 forwarding in KAFKA-12886") - @Test - def testEnvelopeEnabledWhenForwardingManagerPresent(): Unit = { - val forwardingManager = Mockito.mock(classOf[ForwardingManager]) - Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None) - val versionManager = new DefaultApiVersionManager( - listenerType = ListenerType.ZK_BROKER, - forwardingManager = Some(forwardingManager), + listenerType = ListenerType.BROKER, + forwardingManager = forwardingManager, brokerFeatures = brokerFeatures, metadataCache = metadataCache, enableUnstableLastVersion = true ) - assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion)) - assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) + assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion)) + assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false) val envelopeVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id) - assertNotNull(envelopeVersion) - assertEquals(ApiKeys.ENVELOPE.oldestVersion, envelopeVersion.minVersion) - assertEquals(ApiKeys.ENVELOPE.latestVersion, envelopeVersion.maxVersion) - } - - @Test - def testEnvelopeDisabledWhenForwardingManagerEmpty(): Unit = { - val versionManager = new DefaultApiVersionManager( - listenerType = ListenerType.ZK_BROKER, - forwardingManager = None, - brokerFeatures = brokerFeatures, - metadataCache = metadataCache, - enableUnstableLastVersion = true - ) - assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion)) - assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) - - val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false) - assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)) + assertNull(envelopeVersion) } }