Skip to content

Commit

Permalink
Remove casts to KRaftMetadataCache (apache#18579)
Browse files Browse the repository at this point in the history
Reviewers: Mickael Maison <[email protected]>
  • Loading branch information
ijuma authored Jan 17, 2025
1 parent b90ef86 commit 3996e90
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 10 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2368,7 +2368,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
describeClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
val result = metadataCache.asInstanceOf[KRaftMetadataCache].describeClientQuotas(describeClientQuotasRequest.data())
val result = metadataCache.describeClientQuotas(describeClientQuotasRequest.data())
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
result.setThrottleTimeMs(requestThrottleMs)
new DescribeClientQuotasResponse(result)
Expand All @@ -2383,7 +2383,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
val result = metadataCache.asInstanceOf[KRaftMetadataCache].describeScramCredentials(describeUserScramCredentialsRequest.data())
val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
}
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package kafka.server

import kafka.server.metadata.KRaftMetadataCache
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData, UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
Expand All @@ -39,7 +39,7 @@ sealed trait CachedControllerId {
case class ZkCachedControllerId(id: Int) extends CachedControllerId
case class KRaftCachedControllerId(id: Int) extends CachedControllerId

trait MetadataCache {
trait MetadataCache extends ConfigRepository {
/**
* Return topic metadata for a given set of topics and listener. See KafkaApis#handleTopicMetadataRequest for details
* on the use of the two boolean flags.
Expand Down Expand Up @@ -113,6 +113,10 @@ trait MetadataCache {
def getRandomAliveBrokerId: Option[Int]

def features(): FinalizedFeatures

def describeClientQuotas(request: DescribeClientQuotasRequestData): DescribeClientQuotasResponseData

def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData
}

object MetadataCache {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import scala.util.control.Breaks._
class KRaftMetadataCache(
val brokerId: Int,
val kraftVersionSupplier: Supplier[KRaftVersion]
) extends MetadataCache with Logging with ConfigRepository {
) extends MetadataCache with Logging {
this.logIdent = s"[MetadataCache brokerId=$brokerId] "

// This is the cache state. Every MetadataImage instance is immutable, and updates
Expand Down Expand Up @@ -539,11 +539,11 @@ class KRaftMetadataCache(
override def config(configResource: ConfigResource): Properties =
_currentImage.configs().configProperties(configResource)

def describeClientQuotas(request: DescribeClientQuotasRequestData): DescribeClientQuotasResponseData = {
override def describeClientQuotas(request: DescribeClientQuotasRequestData): DescribeClientQuotasResponseData = {
_currentImage.clientQuotas().describe(request)
}

def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData = {
override def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData = {
_currentImage.scram().describe(request)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.{Collections, Optional, Properties}
import java.{time, util}
import kafka.integration.KafkaServerTestHarness
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.KafkaConfig
import kafka.utils.TestUtils._
import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
Expand Down Expand Up @@ -3613,7 +3612,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {

def validateLogConfig(compressionType: String): Unit = {
ensureConsistentKRaftMetadata()
val topicProps = brokers.head.metadataCache.asInstanceOf[KRaftMetadataCache].topicConfig(topic)
val topicProps = brokers.head.metadataCache.topicConfig(topic)
val logConfig = LogConfig.fromProps(Collections.emptyMap[String, AnyRef], topicProps)

assertEquals(compressionType, logConfig.originals.get(TopicConfig.COMPRESSION_TYPE_CONFIG))
Expand Down

0 comments on commit 3996e90

Please sign in to comment.