Skip to content

Commit

Permalink
KAFKA-16513; Add test for WriteTxnMarkers with AlterCluster permission
Browse files Browse the repository at this point in the history
In apache#15837, we introduced the change to allow calling the WriteTxnMarkers API with AlterCluster permissions. This PR proposes 2 enhancements:

- When a WriteTxnMarkers request is received, it is first authorized against the alter cluster permission. If the user does not have this permission, a 'deny' will be logged. However, if the user does have the cluster action permission, the request will be successfully authorized.  Don't log the first deny to avoid confusion.
- Add a `WriteTxnMarkersRequest` to be called from the test `testAuthorizationWithTopicExisting`, so that the request can be exercised and verified with both possible permissions.

Author: Nikhil Ramakrishnan <[email protected]>

Reviewers: Christo Lolov <[email protected]>

Closes apache#15952 from nikramakrishnan/kip1037-addTest
  • Loading branch information
nikramakrishnan authored and clolov committed May 21, 2024
1 parent 52b4596 commit b5a013e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2364,7 +2364,7 @@ class KafkaApis(val requestChannel: RequestChannel,
ensureInterBrokerVersion(IBP_0_11_0_IV0)
// We are checking for AlterCluster permissions first. If it is not present, we are authorizing cluster operation
// The latter will throw an exception if it is denied.
if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
}
val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartit
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
Expand All @@ -60,6 +60,7 @@ import org.junit.jupiter.params.provider.{CsvSource, ValueSource}

import java.util.Collections.singletonList
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
import org.junit.jupiter.api.function.Executable

import scala.annotation.nowarn
Expand Down Expand Up @@ -617,6 +618,22 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
)
).build()

private def writeTxnMarkersRequest: WriteTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
new WriteTxnMarkersRequestData()
.setMarkers(
List(new WritableTxnMarker()
.setProducerId(producerId)
.setProducerEpoch(1)
.setTransactionResult(false)
.setTopics(List(new WritableTxnMarkerTopic()
.setName(tp.topic())
.setPartitionIndexes(List(Integer.valueOf(tp.partition())).asJava)
).asJava)
.setCoordinatorEpoch(1)
).asJava
)
).build()

private def sendRequests(requestKeyToRequest: mutable.Map[ApiKeys, AbstractRequest], topicExists: Boolean = true,
topicNames: Map[Uuid, String] = getTopicNames()) = {
for ((key, request) <- requestKeyToRequest) {
Expand Down Expand Up @@ -683,6 +700,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> listPartitionReassignmentsRequest,
ApiKeys.DESCRIBE_PRODUCERS -> describeProducersRequest,
ApiKeys.DESCRIBE_TRANSACTIONS -> describeTransactionsRequest,
ApiKeys.WRITE_TXN_MARKERS -> writeTxnMarkersRequest,
)
if (!isKRaftTest()) {
// Inter-broker APIs use an invalid broker epoch, so does not affect the test case
Expand Down
10 changes: 8 additions & 2 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2830,7 +2830,7 @@ class KafkaApisTest extends Logging {

val authorizer: Authorizer = mock(classOf[Authorizer])
val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val alterActions = Collections.singletonList(new Action(AclOperation.ALTER, clusterResource, 1, true, true))
val alterActions = Collections.singletonList(new Action(AclOperation.ALTER, clusterResource, 1, true, false))
val clusterActions = Collections.singletonList(new Action(AclOperation.CLUSTER_ACTION, clusterResource, 1, true, true))
val deniedList = Collections.singletonList(AuthorizationResult.DENIED)
when(authorizer.authorize(
Expand Down Expand Up @@ -3074,7 +3074,13 @@ class KafkaApisTest extends Logging {
// Allowing WriteTxnMarkers API with the help of allowedAclOperation parameter.
val authorizer: Authorizer = mock(classOf[Authorizer])
val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val allowedAction = Collections.singletonList(new Action(AclOperation.fromString(allowedAclOperation), clusterResource, 1, true, true))
val allowedAction = Collections.singletonList(new Action(
AclOperation.fromString(allowedAclOperation),
clusterResource,
1,
true,
allowedAclOperation.equals("CLUSTER_ACTION")
))
val deniedList = Collections.singletonList(AuthorizationResult.DENIED)
val allowedList = Collections.singletonList(AuthorizationResult.ALLOWED)
when(authorizer.authorize(
Expand Down

0 comments on commit b5a013e

Please sign in to comment.