Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Introduce resourcegroup rate limit on topic and geo #9

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public boolean resourceGroupExists(String resourceGroupName) throws MetadataStor
return exists(joinPath(BASE_PATH, resourceGroupName));
}

public CompletableFuture<Boolean> resourceGroupExistsAsync(String resourceGroupName) {
return existsAsync(joinPath(BASE_PATH, resourceGroupName));
}

public void createResourceGroup(String resourceGroupName, ResourceGroup rg) throws MetadataStoreException {
create(joinPath(BASE_PATH, resourceGroupName), rg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5267,4 +5267,40 @@ protected CompletableFuture<Void> internalSetSchemaCompatibilityStrategy(SchemaC
.updateTopicPoliciesAsync(topicName, topicPolicies);
}));
}

protected CompletableFuture<Void> internalSetResourceGroup(String resourceGroupName, boolean isGlobal) {
boolean isDelete = StringUtils.isEmpty(resourceGroupName);
return validateTopicPolicyOperationAsync(topicName, PolicyName.RESOURCEGROUP, PolicyOperation.WRITE)
.thenCompose(__ -> {
if (isDelete) {
return CompletableFuture.completedFuture(true);
}
return resourceGroupResources().resourceGroupExistsAsync(resourceGroupName);
})
.thenCompose(exists -> {
if (!exists) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
"ResourceGroup does not exist"));
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal).thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setResourceGroupName(isDelete ? null : resourceGroupName);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
});
}

protected CompletableFuture<String> internalGetResourceGroup(boolean applied, boolean isGlobal) {
return validateTopicPolicyOperationAsync(topicName, PolicyName.RESOURCEGROUP, PolicyOperation.READ)
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getResourceGroupName)
.orElseGet(() -> {
if (applied) {
return getNamespacePolicies(namespaceName).resource_group_name;
}
return null;
})
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3928,5 +3928,61 @@ public void removeSchemaCompatibilityStrategy(
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/resourceGroup")
@ApiOperation(value = "Set ResourceGroup for a topic")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic doesn't exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")
})
public void setResourceGroup(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "ResourceGroup name", required = true) String resourceGroupName) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetResourceGroup(resourceGroupName, isGlobal))
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setResourceGroup", ex, asyncResponse);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/resourceGroup")
@ApiOperation(value = "Get ResourceGroup for a topic")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic doesn't exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")
liudezhi2098 marked this conversation as resolved.
Show resolved Hide resolved
})
public void getResourceGroup(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String encodedTopic,
@QueryParam("applied") @DefaultValue("false") boolean applied,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalGetResourceGroup(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getResourceGroup", ex, asyncResponse);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static class BytesAndMessagesCount {
public enum ResourceGroupMonitoringClass {
Publish,
Dispatch,
ReplicationDispatch,
// Storage; // Punt this for now, until we have a clearer idea of the usage, statistics, etc.
}

Expand All @@ -69,7 +70,7 @@ public enum ResourceGroupMonitoringClass {
public enum ResourceGroupRefTypes {
Tenants,
Namespaces,
// Topics; // Punt this for when we support direct ref/under from topics.
Topics
}

// Default ctor: it is not expected that anything outside of this package will need to directly
Expand All @@ -84,6 +85,13 @@ protected ResourceGroup(ResourceGroupService rgs, String name,
this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
log.info("attaching publish rate limiter {} to {} get {}", this.resourceGroupPublishLimiter.toString(), name,
this.getResourceGroupPublishLimiter());
this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager
.newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
log.info("attaching replication dispatch rate limiter {} to {}", this.resourceGroupReplicationDispatchLimiter,
name);
this.resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager
.newDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
nodece marked this conversation as resolved.
Show resolved Hide resolved
log.info("attaching topic dispatch rate limiter {} to {}", this.resourceGroupDispatchLimiter, name);
}

// ctor for overriding the transport-manager fill/set buffer.
Expand All @@ -97,6 +105,10 @@ protected ResourceGroup(ResourceGroupService rgs, String rgName,
this.setResourceGroupMonitoringClassFields();
this.setResourceGroupConfigParameters(rgConfig);
this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager
.newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager
.newDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor());
this.ruPublisher = rgPublisher;
this.ruConsumer = rgConsumer;
}
Expand All @@ -107,12 +119,15 @@ public ResourceGroup(ResourceGroup other) {
this.resourceGroupName = other.resourceGroupName;
this.rgs = other.rgs;
this.resourceGroupPublishLimiter = other.resourceGroupPublishLimiter;
this.resourceGroupReplicationDispatchLimiter = other.resourceGroupReplicationDispatchLimiter;
this.resourceGroupDispatchLimiter = other.resourceGroupDispatchLimiter;
this.setResourceGroupMonitoringClassFields();

// ToDo: copy the monitoring class fields, and ruPublisher/ruConsumer from other, if required.

this.resourceGroupNamespaceRefs = other.resourceGroupNamespaceRefs;
this.resourceGroupTenantRefs = other.resourceGroupTenantRefs;
this.resourceGroupTopicRefs = other.resourceGroupTopicRefs;

for (int idx = 0; idx < ResourceGroupMonitoringClass.values().length; idx++) {
PerMonitoringClassFields thisFields = this.monitoringClassFields[idx];
Expand Down Expand Up @@ -145,12 +160,19 @@ protected void updateResourceGroup(org.apache.pulsar.common.policies.data.Resour
pubBmc.messages = rgConfig.getPublishRateInMsgs();
pubBmc.bytes = rgConfig.getPublishRateInBytes();
this.resourceGroupPublishLimiter.update(pubBmc);
ResourceGroupRateLimiterManager
.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, rgConfig);
ResourceGroupRateLimiterManager.updateDispatchRateLimiter(resourceGroupDispatchLimiter, rgConfig);
}

protected long getResourceGroupNumOfNSRefs() {
return this.resourceGroupNamespaceRefs.size();
}

protected long getResourceGroupNumOfTopicRefs() {
return this.resourceGroupTopicRefs.size();
}

protected long getResourceGroupNumOfTenantRefs() {
return this.resourceGroupTenantRefs.size();
}
Expand All @@ -168,6 +190,9 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
case Namespaces:
set = this.resourceGroupNamespaceRefs;
break;
case Topics:
set = this.resourceGroupTopicRefs;
nodece marked this conversation as resolved.
Show resolved Hide resolved
break;
}

if (ref) {
Expand All @@ -178,7 +203,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
set.add(name);

// If this is the first ref, register with the transport manager.
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 1) {
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size()
+ this.resourceGroupTopicRefs.size() == 1) {
if (log.isDebugEnabled()) {
log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
}
Expand All @@ -193,7 +219,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes
set.remove(name);

// If this was the last ref, unregister from the transport manager.
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) {
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size()
+ this.resourceGroupTopicRefs.size() == 0) {
if (log.isDebugEnabled()) {
log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
}
Expand Down Expand Up @@ -221,6 +248,9 @@ public void rgFillResourceUsage(ResourceUsage resourceUsage) {
p = resourceUsage.setDispatch();
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p);

p = resourceUsage.setReplicationDispatch();
this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p);

// Punt storage for now.
}

Expand All @@ -234,6 +264,9 @@ public void rgResourceUsageListener(String broker, ResourceUsage resourceUsage)
p = resourceUsage.getDispatch();
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p, broker);

p = resourceUsage.getReplicationDispatch();
this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p, broker);

// Punt storage for now.
}

Expand Down Expand Up @@ -351,14 +384,6 @@ protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass

protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount newQuota) throws PulsarAdminException {
// Only the Publish side is functional now; add the Dispatch side code when the consume side is ready.
if (!ResourceGroupMonitoringClass.Publish.equals(monClass)) {
if (log.isDebugEnabled()) {
log.debug("Doing nothing for monClass={}; only Publish is functional", monClass);
}
return null;
}

this.checkMonitoringClass(monClass);
BytesAndMessagesCount oldBMCount;

Expand All @@ -367,7 +392,22 @@ protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass mo
oldBMCount = monEntity.quotaForNextPeriod;
try {
monEntity.quotaForNextPeriod = newQuota;
this.resourceGroupPublishLimiter.update(newQuota);
switch (monClass) {
case ReplicationDispatch:
ResourceGroupRateLimiterManager
.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, newQuota);
break;
case Publish:
this.resourceGroupPublishLimiter.update(newQuota);
break;
case Dispatch:
ResourceGroupRateLimiterManager.updateDispatchRateLimiter(resourceGroupDispatchLimiter, newQuota);
break;
default:
if (log.isDebugEnabled()) {
log.debug("Doing nothing for monClass={};", monClass);
}
}
} finally {
monEntity.localUsageStatsLock.unlock();
}
Expand Down Expand Up @@ -419,9 +459,16 @@ protected static BytesAndMessagesCount accumulateBMCount(BytesAndMessagesCount .
}

private void checkMonitoringClass(ResourceGroupMonitoringClass monClass) throws PulsarAdminException {
if (monClass != ResourceGroupMonitoringClass.Publish && monClass != ResourceGroupMonitoringClass.Dispatch) {
String errMesg = "Unexpected monitoring class: " + monClass;
throw new PulsarAdminException(errMesg);
switch (monClass) {
case Publish:
break;
case Dispatch:
break;
case ReplicationDispatch:
break;
default:
String errMesg = "Unexpected monitoring class: " + monClass;
throw new PulsarAdminException(errMesg);
}
}

Expand Down Expand Up @@ -566,6 +613,14 @@ private void setResourceGroupConfigParameters(org.apache.pulsar.common.policies.
? -1 : rgConfig.getDispatchRateInBytes();
this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getDispatchRateInMsgs() == null
? -1 : rgConfig.getDispatchRateInMsgs();

idx = ResourceGroupMonitoringClass.ReplicationDispatch.ordinal();
this.monitoringClassFields[idx]
.configValuesPerPeriod.bytes = rgConfig.getReplicationDispatchRateInBytes() == null
? -1 : rgConfig.getReplicationDispatchRateInBytes();
this.monitoringClassFields[idx]
.configValuesPerPeriod.messages = rgConfig.getReplicationDispatchRateInMsgs() == null
? -1 : rgConfig.getReplicationDispatchRateInMsgs();
}

private void setDefaultResourceUsageTransportHandlers() {
Expand Down Expand Up @@ -606,6 +661,7 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
// across all of its usage classes (publish/dispatch/...).
private Set<String> resourceGroupTenantRefs = ConcurrentHashMap.newKeySet();
private Set<String> resourceGroupNamespaceRefs = ConcurrentHashMap.newKeySet();
private Set<String> resourceGroupTopicRefs = ConcurrentHashMap.newKeySet();

// Blobs required for transport manager's resource-usage register/unregister ops.
ResourceUsageConsumer ruConsumer;
Expand Down Expand Up @@ -640,6 +696,12 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
@Getter
protected ResourceGroupPublishLimiter resourceGroupPublishLimiter;

@Getter
protected ResourceGroupDispatchLimiter resourceGroupReplicationDispatchLimiter;

@Getter
protected ResourceGroupDispatchLimiter resourceGroupDispatchLimiter;

protected static class PerMonitoringClassFields {
// This lock covers all the "local" counts (i.e., except for the per-broker usage stats).
Lock localUsageStatsLock;
Expand Down
Loading
Loading