Skip to content

Commit

Permalink
KAFKA-18599: Remove Optional wrapping for forwardingManager in ApiVer…
Browse files Browse the repository at this point in the history
…sionManager (apache#18630)

`forwardingManager` is always present now.

Reviewers: Ismael Juma <[email protected]>
  • Loading branch information
mingyen066 authored Jan 22, 2025
1 parent 7e86bd8 commit 084fcbd
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 54 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ApiVersionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object ApiVersionManager {
def apply(
listenerType: ListenerType,
config: KafkaConfig,
forwardingManager: Option[ForwardingManager],
forwardingManager: ForwardingManager,
supportedFeatures: BrokerFeatures,
metadataCache: MetadataCache,
clientMetricsManager: Option[ClientMetricsManager]
Expand Down Expand Up @@ -129,7 +129,7 @@ class SimpleApiVersionManager(
*/
class DefaultApiVersionManager(
val listenerType: ListenerType,
forwardingManager: Option[ForwardingManager],
forwardingManager: ForwardingManager,
brokerFeatures: BrokerFeatures,
metadataCache: MetadataCache,
val enableUnstableLastVersion: Boolean,
Expand All @@ -143,7 +143,7 @@ class DefaultApiVersionManager(
alterFeatureLevel0: Boolean
): ApiVersionsResponse = {
val finalizedFeatures = metadataCache.features()
val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
val controllerApiVersions = forwardingManager.controllerApiVersions
val clientTelemetryEnabled = clientMetricsManager match {
case Some(manager) => manager.isTelemetryReceiverConfigured
case None => false
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class BrokerServer(
val apiVersionManager = ApiVersionManager(
ListenerType.BROKER,
config,
Some(forwardingManager),
forwardingManager,
brokerFeatures,
metadataCache,
Some(clientMetricsManager)
Expand Down
61 changes: 11 additions & 50 deletions core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.common.KRaftVersion
import org.junit.jupiter.api.{Disabled, Test}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
Expand All @@ -36,9 +36,10 @@ class ApiVersionManagerTest {
@ParameterizedTest
@EnumSource(classOf[ListenerType])
def testApiScope(apiScope: ListenerType): Unit = {
val forwardingManager = Mockito.mock(classOf[ForwardingManager])
val versionManager = new DefaultApiVersionManager(
listenerType = apiScope,
forwardingManager = None,
forwardingManager = forwardingManager,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = true
Expand All @@ -54,9 +55,10 @@ class ApiVersionManagerTest {
@ParameterizedTest
@EnumSource(classOf[ListenerType])
def testDisabledApis(apiScope: ListenerType): Unit = {
val forwardingManager = Mockito.mock(classOf[ForwardingManager])
val versionManager = new DefaultApiVersionManager(
listenerType = apiScope,
forwardingManager = None,
forwardingManager = forwardingManager,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = false
Expand Down Expand Up @@ -85,7 +87,7 @@ class ApiVersionManagerTest {

val versionManager = new DefaultApiVersionManager(
listenerType = ListenerType.ZK_BROKER,
forwardingManager = Some(forwardingManager),
forwardingManager = forwardingManager,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = true
Expand All @@ -103,59 +105,18 @@ class ApiVersionManagerTest {
val forwardingManager = Mockito.mock(classOf[ForwardingManager])
Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None)

for (forwardingManagerOpt <- Seq(Some(forwardingManager), None)) {
val versionManager = new DefaultApiVersionManager(
listenerType = ListenerType.BROKER,
forwardingManager = forwardingManagerOpt,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = true
)
assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))

val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false)
val envelopeVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)
assertNull(envelopeVersion)
}
}

@Disabled("Enable after enable KIP-590 forwarding in KAFKA-12886")
@Test
def testEnvelopeEnabledWhenForwardingManagerPresent(): Unit = {
val forwardingManager = Mockito.mock(classOf[ForwardingManager])
Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None)

val versionManager = new DefaultApiVersionManager(
listenerType = ListenerType.ZK_BROKER,
forwardingManager = Some(forwardingManager),
listenerType = ListenerType.BROKER,
forwardingManager = forwardingManager,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = true
)
assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))

val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false)
val envelopeVersion = apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)
assertNotNull(envelopeVersion)
assertEquals(ApiKeys.ENVELOPE.oldestVersion, envelopeVersion.minVersion)
assertEquals(ApiKeys.ENVELOPE.latestVersion, envelopeVersion.maxVersion)
}

@Test
def testEnvelopeDisabledWhenForwardingManagerEmpty(): Unit = {
val versionManager = new DefaultApiVersionManager(
listenerType = ListenerType.ZK_BROKER,
forwardingManager = None,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = true
)
assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))

val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0, false)
assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id))
assertNull(envelopeVersion)
}
}

0 comments on commit 084fcbd

Please sign in to comment.