Skip to content

Commit

Permalink
KAFKA-18888: Add KIP-877 support to Authorizer
Browse files Browse the repository at this point in the history
  • Loading branch information
mimaison committed Feb 28, 2025
1 parent a39fcac commit 9127ff4
Show file tree
Hide file tree
Showing 37 changed files with 486 additions and 219 deletions.
3 changes: 3 additions & 0 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.metrics.internals" />
<allow pkg="org.apache.kafka.common.metrics.stats" />
</subpackage>
<subpackage name="bootstrap">
<allow pkg="org.apache.kafka.snapshot" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartitionManager;

import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
Expand All @@ -57,7 +58,7 @@ public class KafkaApisBuilder {
private ConfigRepository configRepository = null;
private MetadataCache metadataCache = null;
private Metrics metrics = null;
private Optional<Authorizer> authorizer = Optional.empty();
private Optional<Plugin<Authorizer>> authorizerPlugin = Optional.empty();
private QuotaManagers quotas = null;
private FetchManager fetchManager = null;
private SharePartitionManager sharePartitionManager = null;
Expand Down Expand Up @@ -129,8 +130,8 @@ public KafkaApisBuilder setMetrics(Metrics metrics) {
return this;
}

public KafkaApisBuilder setAuthorizer(Optional<Authorizer> authorizer) {
this.authorizer = authorizer;
public KafkaApisBuilder setAuthorizerPlugin(Optional<Plugin<Authorizer>> authorizerPlugin) {
this.authorizerPlugin = authorizerPlugin;
return this;
}

Expand Down Expand Up @@ -211,7 +212,7 @@ public KafkaApis build() {
configRepository,
metadataCache,
metrics,
OptionConverters.toScala(authorizer),
OptionConverters.toScala(authorizerPlugin),
quotas,
fetchManager,
sharePartitionManager,
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/kafka/server/AclApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import kafka.utils.Logging
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclBinding
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult
import org.apache.kafka.common.message._
Expand All @@ -43,7 +44,7 @@ import scala.jdk.OptionConverters.RichOptional
* Logic to handle ACL requests.
*/
class AclApis(authHelper: AuthHelper,
authorizer: Option[Authorizer],
authorizerPlugin: Option[Plugin[Authorizer]],
requestHelper: RequestHandlerHelper,
name: String,
config: KafkaConfig) extends Logging {
Expand All @@ -58,7 +59,7 @@ class AclApis(authHelper: AuthHelper,
def handleDescribeAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, DESCRIBE)
val describeAclsRequest = request.body[DescribeAclsRequest]
authorizer match {
authorizerPlugin match {
case None =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeAclsResponse(new DescribeAclsResponseData()
Expand All @@ -71,7 +72,7 @@ class AclApis(authHelper: AuthHelper,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeAclsResponse(new DescribeAclsResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setResources(DescribeAclsResponse.aclsResources(auth.acls(filter))),
.setResources(DescribeAclsResponse.aclsResources(auth.get.acls(filter))),
describeAclsRequest.version))
}
CompletableFuture.completedFuture[Unit](())
Expand All @@ -81,7 +82,7 @@ class AclApis(authHelper: AuthHelper,
authHelper.authorizeClusterOperation(request, ALTER)
val createAclsRequest = request.body[CreateAclsRequest]

authorizer match {
authorizerPlugin match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
createAclsRequest.getErrorResponse(requestThrottleMs,
new SecurityDisabledException("No Authorizer is configured.")))
Expand All @@ -106,7 +107,7 @@ class AclApis(authHelper: AuthHelper,
}

val future = new CompletableFuture[util.List[AclCreationResult]]()
val createResults = auth.createAcls(request.context, validBindings.asJava).asScala.map(_.toCompletableFuture)
val createResults = auth.get.createAcls(request.context, validBindings.asJava).asScala.map(_.toCompletableFuture)

def sendResponseCallback(): Unit = {
val aclCreationResults = allBindings.map { acl =>
Expand Down Expand Up @@ -136,7 +137,7 @@ class AclApis(authHelper: AuthHelper,
def handleDeleteAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, ALTER)
val deleteAclsRequest = request.body[DeleteAclsRequest]
authorizer match {
authorizerPlugin match {
case None =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
deleteAclsRequest.getErrorResponse(requestThrottleMs,
Expand All @@ -145,7 +146,7 @@ class AclApis(authHelper: AuthHelper,
case Some(auth) =>

val future = new CompletableFuture[util.List[DeleteAclsFilterResult]]()
val deleteResults = auth.deleteAcls(request.context, deleteAclsRequest.filters)
val deleteResults = auth.get.deleteAcls(request.context, deleteAclsRequest.filters)
.asScala.map(_.toCompletableFuture).toList

def sendResponseCallback(): Unit = {
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/server/AuthHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.EndpointType
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.acl.AclOperation.DESCRIBE
import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.DescribeClusterResponseData
import org.apache.kafka.common.message.DescribeClusterResponseData.DescribeClusterBrokerCollection
import org.apache.kafka.common.protocol.Errors
Expand All @@ -38,7 +39,7 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authoriz
import scala.collection.Seq
import scala.jdk.CollectionConverters._

class AuthHelper(authorizer: Option[Authorizer]) {
class AuthHelper(authorizer: Option[Plugin[Authorizer]]) {
def authorize(requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
Expand All @@ -49,7 +50,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
authorizer.forall { authZ =>
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
authZ.get.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
}
}

Expand All @@ -64,7 +65,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
case Some(authZ) =>
val resourcePattern = new ResourcePattern(resource.resourceType, resource.name, PatternType.LITERAL)
val actions = supportedOps.map { op => new Action(op, resourcePattern, 1, false, false) }
authZ.authorize(request.context, actions.asJava).asScala
authZ.get.authorize(request.context, actions.asJava).asScala
.zip(supportedOps)
.filter(_._1 == AuthorizationResult.ALLOWED)
.map(_._2).toSet
Expand All @@ -77,7 +78,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
def authorizeByResourceType(requestContext: RequestContext, operation: AclOperation,
resourceType: ResourceType): Boolean = {
authorizer.forall { authZ =>
authZ.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED
authZ.get.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED
}
}

Expand Down Expand Up @@ -109,7 +110,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
new Action(operation, resource, count, logIfAllowed, logIfDenied)
}.toBuffer
authZ.authorize(requestContext, actions.asJava).asScala
authZ.get.authorize(requestContext, actions.asJava).asScala
.zip(resourceNameToCount.keySet)
.collect { case (authzResult, resourceName) if authzResult == AuthorizationResult.ALLOWED =>
resourceName
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import kafka.server.metadata._
import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl, SharePartitionManager}
import kafka.utils.CoreUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
Expand Down Expand Up @@ -102,7 +103,7 @@ class BrokerServer(

@volatile var dataPlaneRequestProcessor: KafkaApis = _

var authorizer: Option[Authorizer] = None
var authorizerPlugin: Option[Plugin[Authorizer]] = None
@volatile var socketServer: SocketServer = _
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = _

Expand Down Expand Up @@ -410,8 +411,7 @@ class BrokerServer(
)

// Create and initialize an authorizer if one is configured.
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
authorizerPlugin = config.createNewAuthorizer(metrics)

// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
// for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange
Expand Down Expand Up @@ -455,7 +455,7 @@ class BrokerServer(
configRepository = metadataCache,
metadataCache = metadataCache,
metrics = metrics,
authorizer = authorizer,
authorizerPlugin = authorizerPlugin,
quotas = quotaManagers,
fetchManager = fetchManager,
sharePartitionManager = sharePartitionManager,
Expand Down Expand Up @@ -528,7 +528,7 @@ class BrokerServer(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
authorizer
authorizerPlugin
),
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler
Expand Down Expand Up @@ -585,7 +585,7 @@ class BrokerServer(
// authorizer future is completed.
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizer.toJava,
builder.build(authorizerPlugin.toJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
Expand Down Expand Up @@ -645,7 +645,7 @@ class BrokerServer(
.withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
.withGroupConfigManager(groupConfigManager)
.withPersister(persister)
.withAuthorizer(authorizer.toJava)
.withAuthorizerPlugin(authorizerPlugin.toJava)
.build()
} else {
GroupCoordinatorAdapter(
Expand Down Expand Up @@ -773,7 +773,7 @@ class BrokerServer(
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (dataPlaneRequestProcessor != null)
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer"))

/**
* We must shutdown the scheduler early because otherwise, the scheduler could touch other
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, CREATE_TOKENS, DELETE, DESCRIBE, DESCRIBE_CONFIGS}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException, UnsupportedVersionException}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.{FatalExitError, Plugin, Topic}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
Expand Down Expand Up @@ -67,7 +66,7 @@ import scala.jdk.CollectionConverters._
*/
class ControllerApis(
val requestChannel: RequestChannel,
val authorizer: Option[Authorizer],
val authorizerPlugin: Option[Plugin[Authorizer]],
val quotas: QuotaManagers,
val time: Time,
val controller: Controller,
Expand All @@ -80,11 +79,11 @@ class ControllerApis(
) extends ApiRequestHandler with Logging {

this.logIdent = s"[ControllerApis nodeId=${config.nodeId}] "
val authHelper = new AuthHelper(authorizer)
val authHelper = new AuthHelper(authorizerPlugin)
val configHelper = new ConfigHelper(metadataCache, config, metadataCache)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
val runtimeLoggerManager = new RuntimeLoggerManager(config.nodeId, logger.underlying)
private val aclApis = new AclApis(authHelper, authorizer, requestHelper, "controller", config)
private val aclApis = new AclApis(authHelper, authorizerPlugin, requestHelper, "controller", config)

def isClosed: Boolean = aclApis.isClosed

Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.metrics.Metrics
Expand Down Expand Up @@ -81,7 +82,7 @@ class ControllerServer(
var status: ProcessStatus = SHUTDOWN

var linuxIoMetricsCollector: LinuxIoMetricsCollector = _
@volatile var authorizer: Option[Authorizer] = None
@volatile var authorizerPlugin: Option[Plugin[Authorizer]] = None
var tokenCache: DelegationTokenCache = _
var credentialProvider: CredentialProvider = _
var socketServer: SocketServer = _
Expand Down Expand Up @@ -137,8 +138,7 @@ class ControllerServer(
metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
}

authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
authorizerPlugin = config.createNewAuthorizer(metrics)

metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())

Expand Down Expand Up @@ -178,7 +178,7 @@ class ControllerServer(

val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizer.toJava,
builder.build(authorizerPlugin.toJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
Expand Down Expand Up @@ -256,9 +256,11 @@ class ControllerServer(

// If we are using a ClusterMetadataAuthorizer, requests to add or remove ACLs must go
// through the controller.
authorizer match {
case Some(a: ClusterMetadataAuthorizer) => a.setAclMutator(controller)
case _ =>
authorizerPlugin.foreach { plugin =>
plugin.get match {
case a: ClusterMetadataAuthorizer => a.setAclMutator(controller)
case _ =>
}
}

quotaManagers = QuotaFactory.instantiate(config,
Expand All @@ -267,7 +269,7 @@ class ControllerServer(
s"controller-${config.nodeId}-")
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
authorizer,
authorizerPlugin,
quotaManagers,
time,
controller,
Expand Down Expand Up @@ -373,7 +375,7 @@ class ControllerServer(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
authorizer
authorizerPlugin
))

// Install all metadata publishers.
Expand Down Expand Up @@ -465,7 +467,7 @@ class ControllerServer(
CoreUtils.swallow(quotaManagers.shutdown(), this)
Utils.closeQuietly(controller, "controller")
Utils.closeQuietly(quorumControllerMetrics, "quorum controller metrics")
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer"))
createTopicPolicy.foreach(policy => Utils.closeQuietly(policy, "create topic policy"))
alterConfigPolicy.foreach(policy => Utils.closeQuietly(policy, "alter config policy"))
socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
Expand Down
16 changes: 10 additions & 6 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
* directly. They are provided both old and new configs.
*/
def addReconfigurables(kafkaServer: KafkaBroker): Unit = {
kafkaServer.authorizer match {
case Some(authz: Reconfigurable) => addReconfigurable(authz)
case _ =>
kafkaServer.authorizerPlugin.foreach { plugin =>
plugin.get match {
case authz: Reconfigurable => addReconfigurable(authz)
case _ =>
}
}
addReconfigurable(kafkaServer.kafkaYammerMetrics)
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
Expand All @@ -251,9 +253,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
* Add reconfigurables to be notified when a dynamic controller config is updated.
*/
def addReconfigurables(controller: ControllerServer): Unit = {
controller.authorizer match {
case Some(authz: Reconfigurable) => addReconfigurable(authz)
case _ =>
controller.authorizerPlugin.foreach { plugin =>
plugin.get match {
case authz: Reconfigurable => addReconfigurable(authz)
case _ =>
}
}
if (!kafkaConfig.processRoles.contains(ProcessRole.BrokerRole)) {
// only add these if the controller isn't also running the broker role
Expand Down
Loading

0 comments on commit 9127ff4

Please sign in to comment.