Skip to content

Commit

Permalink
[improve][broker] do not grant permission for each partition to reduc…
Browse files Browse the repository at this point in the history
…e unnecessary zk metadata (apache#18222)

Co-authored-by: fanjianye <[email protected]>
  • Loading branch information
TakaHiR07 and fanjianye authored Jul 3, 2023
1 parent 00c3745 commit e8a3011
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,20 +299,9 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenCompose(unused -> grantPermissionsAsync(topicNamePartition, role,
actions));
}
}
return future.thenCompose(unused -> grantPermissionsAsync(topicName, role, actions))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
}))).exceptionally(ex -> {
grantPermissionsAsync(topicName, role, actions)
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,19 +930,6 @@ public void testGrantPartitionedTopic() {
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Map<String, Set<AuthAction>> permissions = (Map<String, Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(permissions.get(role), expectActions);
TopicName topicName = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace,
partitionedTopicName);
for (int i = 0; i < numPartitions; i++) {
TopicName partition = topicName.getPartition(i);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPermissionsOnTopic(response, testTenant, testNamespace,
partition.getEncodedLocalName());
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Map<String, Set<AuthAction>> partitionPermissions =
(Map<String, Set<AuthAction>>) responseCaptor.getValue();
Assert.assertEquals(partitionPermissions.get(role), expectActions);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,6 @@ public void testDeleteAuthenticationPoliciesOfTopic() throws Exception {
Awaitility.await().untilAsserted(() -> {
assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
.get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
for (int i = 0; i < numPartitions; i++) {
assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
.get().auth_policies.getTopicAuthentication()
.containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
}
});

admin.topics().deletePartitionedTopic("persistent://p1/ns1/partitioned-topic");
Expand Down

0 comments on commit e8a3011

Please sign in to comment.