From e90810659c19af83e2d3c207fb2d66600991421b Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 22 Feb 2024 00:44:05 +0800 Subject: [PATCH 1/5] [feat][broker] Add ResourceGroup management on the topic level Signed-off-by: Zixuan Liu --- .../resources/ResourceGroupResources.java | 4 + .../admin/impl/PersistentTopicsBase.java | 36 +++ .../broker/admin/v2/PersistentTopics.java | 56 +++++ .../broker/resourcegroup/ResourceGroup.java | 16 +- .../resourcegroup/ResourceGroupService.java | 130 ++++++++-- .../pulsar/broker/service/AbstractTopic.java | 46 +++- .../nonpersistent/NonPersistentTopic.java | 6 +- .../service/persistent/PersistentTopic.java | 16 +- .../broker/admin/AdminResourceGroupTest.java | 141 +++++++++++ .../ResourceGroupServiceTest.java | 25 +- ...GroupUsageAggregationOnTopicLevelTest.java | 228 ++++++++++++++++++ .../pulsar/client/admin/TopicPolicies.java | 58 +++++ .../admin/internal/TopicPoliciesImpl.java | 48 ++++ .../pulsar/admin/cli/CmdTopicPolicies.java | 59 +++++ .../policies/data/HierarchyTopicPolicies.java | 3 + .../common/policies/data/TopicPolicies.java | 1 + 16 files changed, 822 insertions(+), 51 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationOnTopicLevelTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java index 414bf4ffcfc35..e59f11e9d968b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java @@ -46,6 +46,10 @@ public boolean resourceGroupExists(String resourceGroupName) throws MetadataStor return exists(joinPath(BASE_PATH, resourceGroupName)); } + public CompletableFuture resourceGroupExistsAsync(String resourceGroupName) { + return existsAsync(joinPath(BASE_PATH, resourceGroupName)); + } + public void createResourceGroup(String resourceGroupName, ResourceGroup rg) throws MetadataStoreException { create(joinPath(BASE_PATH, resourceGroupName), rg); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f8899e4a8a3a6..e9aa9f8ef670d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -5267,4 +5267,40 @@ protected CompletableFuture internalSetSchemaCompatibilityStrategy(SchemaC .updateTopicPoliciesAsync(topicName, topicPolicies); })); } + + protected CompletableFuture internalSetResourceGroup(String resourceGroupName, boolean isGlobal) { + boolean isDelete = StringUtils.isEmpty(resourceGroupName); + return validateTopicPolicyOperationAsync(topicName, PolicyName.RESOURCEGROUP, PolicyOperation.WRITE) + .thenCompose(__ -> { + if (isDelete) { + return CompletableFuture.completedFuture(true); + } + return resourceGroupResources().resourceGroupExistsAsync(resourceGroupName); + }) + .thenCompose(exists -> { + if (!exists) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + "ResourceGroup does not exist")); + } + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal).thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setResourceGroupName(isDelete ? null : resourceGroupName); + topicPolicies.setIsGlobal(isGlobal); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }); + }); + } + + protected CompletableFuture internalGetResourceGroup(boolean applied, boolean isGlobal) { + return validateTopicPolicyOperationAsync(topicName, PolicyName.RESOURCEGROUP, PolicyOperation.READ) + .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenApply(op -> op.map(TopicPolicies::getResourceGroupName) + .orElseGet(() -> { + if (applied) { + return getNamespacePolicies(namespaceName).resource_group_name; + } + return null; + }) + )); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 26889da82d417..c1826266b32c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -3928,5 +3928,61 @@ public void removeSchemaCompatibilityStrategy( }); } + @POST + @Path("/{tenant}/{namespace}/{topic}/resourceGroup") + @ApiOperation(value = "Set ResourceGroup for a topic") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic doesn't exist"), + @ApiResponse(code = 405, message = + "Topic level policy is disabled, enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification") + }) + public void setResourceGroup( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "ResourceGroup name", required = true) String resourceGroupName) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetResourceGroup(resourceGroupName, isGlobal)) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setResourceGroup", ex, asyncResponse); + return null; + }); + } + + @GET + @Path("/{tenant}/{namespace}/{topic}/resourceGroup") + @ApiOperation(value = "Get ResourceGroup for a topic") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic doesn't exist"), + @ApiResponse(code = 405, message = + "Topic level policy is disabled, enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification") + }) + public void getResourceGroup( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") String encodedTopic, + @QueryParam("applied") @DefaultValue("false") boolean applied, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetResourceGroup(applied, isGlobal)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getResourceGroup", ex, asyncResponse); + return null; + }); + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index 79db7da063034..d927b628e9f69 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -69,7 +69,7 @@ public enum ResourceGroupMonitoringClass { public enum ResourceGroupRefTypes { Tenants, Namespaces, - // Topics; // Punt this for when we support direct ref/under from topics. + Topics } // Default ctor: it is not expected that anything outside of this package will need to directly @@ -113,6 +113,7 @@ public ResourceGroup(ResourceGroup other) { this.resourceGroupNamespaceRefs = other.resourceGroupNamespaceRefs; this.resourceGroupTenantRefs = other.resourceGroupTenantRefs; + this.resourceGroupTopicRefs = other.resourceGroupTopicRefs; for (int idx = 0; idx < ResourceGroupMonitoringClass.values().length; idx++) { PerMonitoringClassFields thisFields = this.monitoringClassFields[idx]; @@ -151,6 +152,10 @@ protected long getResourceGroupNumOfNSRefs() { return this.resourceGroupNamespaceRefs.size(); } + protected long getResourceGroupNumOfTopicRefs() { + return this.resourceGroupTopicRefs.size(); + } + protected long getResourceGroupNumOfTenantRefs() { return this.resourceGroupTenantRefs.size(); } @@ -168,6 +173,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes case Namespaces: set = this.resourceGroupNamespaceRefs; break; + case Topics: + set = this.resourceGroupTopicRefs; } if (ref) { @@ -178,7 +185,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes set.add(name); // If this is the first ref, register with the transport manager. - if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 1) { + if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() + + this.resourceGroupTopicRefs.size() == 1) { if (log.isDebugEnabled()) { log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName); } @@ -193,7 +201,8 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes set.remove(name); // If this was the last ref, unregister from the transport manager. - if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) { + if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() + + this.resourceGroupTopicRefs.size() == 0) { if (log.isDebugEnabled()) { log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName); } @@ -606,6 +615,7 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { // across all of its usage classes (publish/dispatch/...). private Set resourceGroupTenantRefs = ConcurrentHashMap.newKeySet(); private Set resourceGroupNamespaceRefs = ConcurrentHashMap.newKeySet(); + private Set resourceGroupTopicRefs = ConcurrentHashMap.newKeySet(); // Blobs required for transport manager's resource-usage register/unregister ops. ResourceUsageConsumer ruConsumer; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index c74681fdb731a..9fcdb6790b30b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -167,9 +167,11 @@ public void resourceGroupDelete(String name) throws PulsarAdminException { long tenantRefCount = rg.getResourceGroupNumOfTenantRefs(); long nsRefCount = rg.getResourceGroupNumOfNSRefs(); - if ((tenantRefCount + nsRefCount) > 0) { + long topicRefCount = rg.getResourceGroupNumOfTopicRefs(); + if ((tenantRefCount + nsRefCount + topicRefCount) > 0) { String errMesg = "Resource group " + name + " still has " + tenantRefCount + " tenant refs"; errMesg += " and " + nsRefCount + " namespace refs on it"; + errMesg += " and " + topicRefCount + " topic refs on it"; throw new PulsarAdminException(errMesg); } @@ -293,6 +295,45 @@ public void unRegisterNameSpace(String resourceGroupName, NamespaceName fqNamesp rgNamespaceUnRegisters.labels(resourceGroupName).inc(); } + /** + * Registers a topic as a user of a resource group. + * + * @param resourceGroupName + * @param topicName complete topic name + */ + public void registerTopic(String resourceGroupName, TopicName topicName) { + ResourceGroup rg = resourceGroupsMap.get(resourceGroupName); + if (rg == null) { + throw new IllegalStateException("Resource group does not exist: " + resourceGroupName); + } + + ResourceGroupOpStatus status = rg.registerUsage(topicName.toString(), ResourceGroupRefTypes.Topics, + true, this.resourceUsageTransportManagerMgr); + if (status == ResourceGroupOpStatus.Exists) { + String msg = String.format("Topic %s already references the target resource group %s", + topicName, resourceGroupName); + throw new IllegalStateException(msg); + } + + // Associate this topic-name with the RG. + this.topicToRGsMap.put(topicName, rg); + rgTopicRegisters.labels(resourceGroupName).inc(); + } + + /** + * UnRegisters a topic from a resource group. + * + * @param topicName complete topic name + */ + public void unRegisterTopic(TopicName topicName) { + ResourceGroup remove = this.topicToRGsMap.remove(topicName); + if (remove != null) { + remove.registerUsage(topicName.toString(), ResourceGroupRefTypes.Topics, + false, this.resourceUsageTransportManagerMgr); + rgTopicUnRegisters.labels(remove.resourceGroupName).inc(); + } + } + /** * Return the resource group associated with a namespace. * @@ -303,6 +344,11 @@ public ResourceGroup getNamespaceResourceGroup(NamespaceName namespaceName) { return this.namespaceToRGsMap.get(namespaceName); } + @VisibleForTesting + public ResourceGroup getTopicResourceGroup(TopicName topicName) { + return this.topicToRGsMap.get(topicName); + } + @Override public void close() throws Exception { if (aggregateLocalUsagePeriodicTask != null) { @@ -318,8 +364,16 @@ public void close() throws Exception { topicConsumeStats.clear(); } + private void incrementUsage(ResourceGroup resourceGroup, + ResourceGroupMonitoringClass monClass, BytesAndMessagesCount incStats) + throws PulsarAdminException { + resourceGroup.incrementLocalUsageStats(monClass, incStats); + rgLocalUsageBytes.labels(resourceGroup.resourceGroupName, monClass.name()).inc(incStats.bytes); + rgLocalUsageMessages.labels(resourceGroup.resourceGroupName, monClass.name()).inc(incStats.messages); + } + /** - * Increments usage stats for the resource groups associated with the given namespace and tenant. + * Increments usage stats for the resource groups associated with the given namespace, tenant, and topic. * Expected to be called when a message is produced or consumed on a topic, or when we calculate * usage periodically in the background by going through broker-service stats. [Not yet decided * which model we will follow.] Broker-service stats will be cumulative, while calls from the @@ -327,22 +381,25 @@ public void close() throws Exception { * * If the tenant and NS are associated with different RGs, the statistics on both RGs are updated. * If the tenant and NS are associated with the same RG, the stats on the RG are updated only once + * If the tenant, NS and topic are associated with the same RG, the stats on the RG are updated only once * (to avoid a direct double-counting). * ToDo: will this distinction result in "expected semantics", or shock from users? * For now, the only caller is internal to this class. * + * @param topicName Complete topic name * @param tenantName - * @param nsName + * @param nsName Complete namespace name * @param monClass * @param incStats * @returns true if the stats were updated; false if nothing was updated. */ - protected boolean incrementUsage(String tenantName, String nsName, - ResourceGroupMonitoringClass monClass, - BytesAndMessagesCount incStats) throws PulsarAdminException { - final ResourceGroup nsRG = this.namespaceToRGsMap.get(NamespaceName.get(tenantName, nsName)); + protected boolean incrementUsage(String topicName, String tenantName, String nsName, + ResourceGroupMonitoringClass monClass, + BytesAndMessagesCount incStats) throws PulsarAdminException { + final ResourceGroup nsRG = this.namespaceToRGsMap.get(NamespaceName.get(nsName)); final ResourceGroup tenantRG = this.tenantToRGsMap.get(tenantName); - if (tenantRG == null && nsRG == null) { + final ResourceGroup topicRG = this.topicToRGsMap.get(TopicName.get(topicName)); + if (tenantRG == null && nsRG == null && topicRG == null) { return false; } @@ -353,24 +410,40 @@ protected boolean incrementUsage(String tenantName, String nsName, throw new PulsarAdminException(errMesg); } - if (nsRG == tenantRG) { + if (tenantRG == nsRG && nsRG == topicRG) { // Update only once in this case. - // Note that we will update both tenant and namespace RGs in other cases. - nsRG.incrementLocalUsageStats(monClass, incStats); - rgLocalUsageMessages.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.messages); - rgLocalUsageBytes.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.bytes); + // Note that we will update both tenant, namespace and topic RGs in other cases. + incrementUsage(tenantRG, monClass, incStats); + return true; + } + + if (tenantRG != null && tenantRG == nsRG) { + // Tenant and Namespace GRs are same. + incrementUsage(tenantRG, monClass, incStats); + if (topicRG == null) { + return true; + } + } + + if (nsRG != null && nsRG == topicRG) { + // Namespace and Topic GRs are same. + incrementUsage(nsRG, monClass, incStats); return true; } if (tenantRG != null) { - tenantRG.incrementLocalUsageStats(monClass, incStats); - rgLocalUsageMessages.labels(tenantRG.resourceGroupName, monClass.name()).inc(incStats.messages); - rgLocalUsageBytes.labels(tenantRG.resourceGroupName, monClass.name()).inc(incStats.bytes); + // Tenant GR is different from other resource GR. + incrementUsage(tenantRG, monClass, incStats); } + if (nsRG != null) { - nsRG.incrementLocalUsageStats(monClass, incStats); - rgLocalUsageMessages.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.messages); - rgLocalUsageBytes.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.bytes); + // Namespace GR is different from other resource GR. + incrementUsage(nsRG, monClass, incStats); + } + + if (topicRG != null) { + // Topic GR is different from other resource GR. + incrementUsage(topicRG, monClass, incStats); } return true; @@ -459,7 +532,7 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n } try { - boolean statsUpdated = this.incrementUsage(tenantString, nsString, monClass, bmDiff); + boolean statsUpdated = this.incrementUsage(topicName, tenantString, nsString, monClass, bmDiff); if (log.isDebugEnabled()) { log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; " + "by {} bytes, {} mesgs", @@ -550,14 +623,15 @@ protected void aggregateResourceGroupLocalUsages() { final TopicStats topicStats = entry.getValue(); final TopicName topic = TopicName.get(topicName); final String tenantString = topic.getTenant(); - final String nsString = topic.getNamespacePortion(); + final String nsString = topic.getNamespace(); final NamespaceName fqNamespace = topic.getNamespaceObject(); // Can't use containsKey here, as that checks for exact equality // (we need a check for string-comparison). val tenantRG = this.tenantToRGsMap.get(tenantString); val namespaceRG = this.namespaceToRGsMap.get(fqNamespace); - if (tenantRG == null && namespaceRG == null) { + val topicRG = this.topicToRGsMap.get(topic); + if (tenantRG == null && namespaceRG == null && topicRG == null) { // This topic's NS/tenant are not registered to any RG. continue; } @@ -745,6 +819,7 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie // Given a qualified NS-name (i.e., in "tenant/namespace" format), record its associated resource-group private ConcurrentHashMap namespaceToRGsMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap topicToRGsMap = new ConcurrentHashMap<>(); // Maps to maintain the usage per topic, in produce/consume directions. private ConcurrentHashMap topicProduceStats = new ConcurrentHashMap<>(); @@ -832,6 +907,17 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie .labelNames(resourceGroupLabel) .register(); + private static final Counter rgTopicRegisters = Counter.build() + .name("pulsar_resource_group_topic_registers") + .help("Number of registrations of topics") + .labelNames(resourceGroupLabel) + .register(); + private static final Counter rgTopicUnRegisters = Counter.build() + .name("pulsar_resource_group_topic_unregisters") + .help("Number of un-registrations of topics") + .labelNames(resourceGroupLabel) + .register(); + private static final Summary rgUsageAggregationLatency = Summary.build() .quantile(0.5, 0.05) .quantile(0.9, 0.01) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 7f5c30a729fe7..5c6657d4a7a0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -50,6 +50,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resourcegroup.ResourceGroup; import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupService; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; @@ -225,6 +226,7 @@ protected void updateTopicPolicy(TopicPolicies data) { topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate())); topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold()); topicPolicies.getDispatchRate().updateTopicValue(normalize(data.getDispatchRate())); + topicPolicies.getResourceGroupName().updateTopicValue(data.getResourceGroupName()); } protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { @@ -271,6 +273,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { brokerService.getPulsar().getConfig().getClusterName()); updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies); updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); + topicPolicies.getResourceGroupName().updateNamespaceValue(namespacePolicies.resource_group_name); } private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) { @@ -1120,31 +1123,54 @@ public void updateResourceGroupLimiter(Optional optPolicies) { updateResourceGroupLimiter(policies); } + /** + * @deprecated Use {@link #updateDispatchRateLimiter()} instead. + */ + @Deprecated public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) { requireNonNull(namespacePolicies); - // attach the resource-group level rate limiters, if set - String rgName = namespacePolicies.resource_group_name; + topicPolicies.getResourceGroupName().updateNamespaceValue(namespacePolicies.resource_group_name); + updateResourceGroupLimiter(); + } + + public void updateResourceGroupLimiter() { + String rgName = topicPolicies.getResourceGroupName().get(); if (rgName != null) { - final ResourceGroup resourceGroup = - brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); + ResourceGroupService resourceGroupService = brokerService.getPulsar().getResourceGroupServiceManager(); + final ResourceGroup resourceGroup = resourceGroupService.resourceGroupGet(rgName); if (resourceGroup != null) { + TopicName topicName = TopicName.get(topic); + resourceGroupService.unRegisterTopic(topicName); + String topicRg = topicPolicies.getResourceGroupName().getTopicValue(); + if (topicRg != null) { + try { + resourceGroupService.registerTopic(topicRg, topicName); + } catch (Exception e) { + log.error("Failed to register resource group {} for topic {}", rgName, topic); + return; + } + } this.resourceGroupRateLimitingEnabled = true; this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter(); this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead); log.info("Using resource group {} rate limiter for topic {}", rgName, topic); - return; } } else { - if (this.resourceGroupRateLimitingEnabled) { - this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName()); - this.resourceGroupPublishLimiter = null; - this.resourceGroupRateLimitingEnabled = false; - } + closeResourceGroupLimiter(); + /* Namespace detached from resource group. Enable the producer read */ enableProducerReadForPublishRateLimiting(); } } + protected void closeResourceGroupLimiter() { + if (resourceGroupRateLimitingEnabled) { + this.resourceGroupPublishLimiter = null; + this.resourceGroupRateLimitingEnabled = false; + brokerService.getPulsar().getResourceGroupServiceManager().unRegisterTopic(TopicName.get(topic)); + } + } + public long getMsgInCounter() { return this.msgInCounter.longValue(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index e3ecd39b71898..3f9f088baa583 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -170,7 +170,7 @@ public CompletableFuture initialize() { schemaValidationEnforced = policies.schema_validation_enforced; } updatePublishRateLimiter(); - updateResourceGroupLimiter(policies); + updateResourceGroupLimiter(); }); } @@ -444,6 +444,9 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c brokerService.executor().execute(() -> { brokerService.removeTopicFromCache(NonPersistentTopic.this); unregisterTopicPolicyListener(); + + closeResourceGroupLimiter(); + log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); }); @@ -511,6 +514,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect brokerService.executor().execute(() -> { brokerService.removeTopicFromCache(NonPersistentTopic.this); unregisterTopicPolicyListener(); + closeResourceGroupLimiter(); closeFuture.complete(null); }); }).exceptionally(exception -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cf6b82294dc02..148d5862e8c37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -319,7 +319,7 @@ public CompletableFuture initialize() { if (!optPolicies.isPresent()) { isEncryptionRequired = false; updatePublishRateLimiter(); - updateResourceGroupLimiter(new Policies()); + updateResourceGroupLimiter(); initializeDispatchRateLimiterIfNeeded(); updateSubscribeRateLimiter(); return; @@ -335,7 +335,7 @@ public CompletableFuture initialize() { updatePublishRateLimiter(); - updateResourceGroupLimiter(policies); + updateResourceGroupLimiter(); this.isEncryptionRequired = policies.encryption_required; @@ -1224,6 +1224,8 @@ public void deleteLedgerComplete(Object ctx) { unregisterTopicPolicyListener(); + closeResourceGroupLimiter(); + log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); } @@ -1342,6 +1344,9 @@ private void disposeTopic(CompletableFuture closeFuture) { subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); unregisterTopicPolicyListener(); + + closeResourceGroupLimiter(); + log.info("[{}] Topic closed", topic); cancelFencedTopicMonitoringTask(); closeFuture.complete(null); @@ -2457,7 +2462,7 @@ public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { // Apply policies for components. List> applyPolicyTasks = applyUpdatedTopicPolicies(); - applyPolicyTasks.add(applyUpdatedNamespacePolicies(data)); + applyPolicyTasks.add(applyUpdatedNamespacePolicies()); return FutureUtil.waitForAll(applyPolicyTasks) .thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic)) .exceptionally(ex -> { @@ -2466,8 +2471,8 @@ public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { }); } - private CompletableFuture applyUpdatedNamespacePolicies(Policies namespaceLevelPolicies) { - return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter(namespaceLevelPolicies)); + private CompletableFuture applyUpdatedNamespacePolicies() { + return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter()); } private List> applyUpdatedTopicPolicies() { @@ -2486,6 +2491,7 @@ private List> applyUpdatedTopicPolicies() { applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateDispatchRateLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscribeRateLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updatePublishRateLimiter())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscriptionsDispatcherRateLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java new file mode 100644 index 0000000000000..7c539e13044f8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import java.util.UUID; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ResourceGroup; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Unit test {@link AdminResource}. + */ +@Test(groups = "broker-admin") +public class AdminResourceGroupTest extends BrokerTestBase { + + @BeforeClass + @Override + public void setup() throws Exception { + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLevelPoliciesEnabled(true); + conf.setSystemTopicEnabled(true); + } + + @Test + public void testTopicResourceGroup() throws PulsarAdminException { + String topic = newTopicName(); + TopicName topicName = TopicName.get(topic); + + String resourceGroupName = "rg-topic-" + UUID.randomUUID(); + ResourceGroup resourceGroup = new ResourceGroup(); + resourceGroup.setPublishRateInMsgs(1000); + resourceGroup.setPublishRateInBytes(100000L); + resourceGroup.setDispatchRateInMsgs(2000); + resourceGroup.setDispatchRateInBytes(200000L); + admin.resourcegroups().createResourceGroup(resourceGroupName, resourceGroup); + + admin.topics().createNonPartitionedTopic(topic); + + admin.topicPolicies().setResourceGroup(topic, resourceGroupName); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topicPolicies().getResourceGroup(topic, true), resourceGroupName); + }); + + Awaitility.await().untilAsserted(() -> { + org.apache.pulsar.broker.resourcegroup.ResourceGroup rg = pulsar.getResourceGroupServiceManager() + .getTopicResourceGroup(topicName); + assertNotNull(rg); + assertEquals(rg.resourceGroupName, resourceGroupName); + }); + + admin.topicPolicies().removeResourceGroup(topic); + Awaitility.await().untilAsserted(() -> { + assertTrue(StringUtils.isEmpty(admin.topicPolicies().getResourceGroup(topic, true))); + org.apache.pulsar.broker.resourcegroup.ResourceGroup rg = pulsar.getResourceGroupServiceManager() + .getTopicResourceGroup(topicName); + assertNull(rg); + }); + } + + @Test + public void testTopicResourceGroupOverriderNamespaceResourceGroup() throws PulsarAdminException { + String namespaceResourceGroupName = "rg-ns-" + UUID.randomUUID(); + ResourceGroup namespaceResourceGroup = new ResourceGroup(); + namespaceResourceGroup.setPublishRateInMsgs(1001); + namespaceResourceGroup.setPublishRateInBytes(100001L); + namespaceResourceGroup.setDispatchRateInMsgs(2001); + namespaceResourceGroup.setDispatchRateInBytes(200001L); + admin.resourcegroups().createResourceGroup(namespaceResourceGroupName, namespaceResourceGroup); + + String topicResourceGroupName = "rg-topic-" + UUID.randomUUID(); + ResourceGroup topicResourceGroup = new ResourceGroup(); + topicResourceGroup.setPublishRateInMsgs(1000); + topicResourceGroup.setPublishRateInBytes(100000L); + topicResourceGroup.setDispatchRateInMsgs(2000); + topicResourceGroup.setDispatchRateInBytes(200000L); + admin.resourcegroups().createResourceGroup(topicResourceGroupName, topicResourceGroup); + + String topic = newTopicName(); + TopicName topicName = TopicName.get(topic); + String namespace = topicName.getNamespace(); + admin.namespaces().setNamespaceResourceGroup(namespace, namespaceResourceGroupName); + assertEquals(admin.namespaces().getNamespaceResourceGroup(namespace), namespaceResourceGroupName); + + admin.topics().createNonPartitionedTopic(topic); + admin.topicPolicies().setResourceGroup(topic, topicResourceGroupName); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topicPolicies() + .getResourceGroup(topic, true), topicResourceGroupName); + org.apache.pulsar.broker.resourcegroup.ResourceGroup rg = pulsar.getResourceGroupServiceManager() + .getTopicResourceGroup(topicName); + assertNotNull(rg); + assertEquals(rg.resourceGroupName, topicResourceGroupName); + }); + + admin.topicPolicies().removeResourceGroup(topic); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topicPolicies() + .getResourceGroup(topic, true), namespaceResourceGroupName); + org.apache.pulsar.broker.resourcegroup.ResourceGroup rg = pulsar.getResourceGroupServiceManager() + .getTopicResourceGroup(topicName); + assertNull(rg); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index 86dff398f9774..cb408bcbc0cf1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -18,10 +18,13 @@ */ package org.apache.pulsar.broker.resourcegroup; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; -import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; import org.apache.pulsar.broker.service.resource.usage.NetworkUsage; import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -30,13 +33,10 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; -import org.testng.Assert; import org.testng.annotations.Test; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; public class ResourceGroupServiceTest extends MockedPulsarServiceBaseTest { @BeforeClass @@ -111,16 +111,18 @@ public void measureOpsTime() throws PulsarAdminException { numPerfTestIterations, diffMsecs, (1000 * (float) diffMsecs)/numPerfTestIterations); // Going through the resource-group service - final String tenantName = "SomeTenant"; - final String namespaceName = "SomeNameSpace"; + final TopicName topicName = TopicName.get("SomeTenant/SomeNameSpace/my-topic"); + rgs.registerTopic(rgName, topicName); + final String tenantName = topicName.getTenant(); + final String namespaceName = topicName.getNamespace(); rgs.registerTenant(rgName, tenantName); - final NamespaceName tenantAndNamespaceName = NamespaceName.get(tenantName, namespaceName); + final NamespaceName tenantAndNamespaceName = topicName.getNamespaceObject(); rgs.registerNameSpace(rgName, tenantAndNamespaceName); mSecsStart = System.currentTimeMillis(); for (int ix = 0; ix < numPerfTestIterations; ix++) { for (int monClassIdx = 0; monClassIdx < ResourceGroupMonitoringClass.values().length; monClassIdx++) { monClass = ResourceGroupMonitoringClass.values()[monClassIdx]; - rgs.incrementUsage(tenantName, namespaceName, monClass, stats); + rgs.incrementUsage(topicName.toString(), tenantName, namespaceName, monClass, stats); } } mSecsEnd = System.currentTimeMillis(); @@ -129,6 +131,7 @@ public void measureOpsTime() throws PulsarAdminException { numPerfTestIterations, diffMsecs, (1000 * (float) diffMsecs)/numPerfTestIterations); rgs.unRegisterTenant(rgName, tenantName); rgs.unRegisterNameSpace(rgName, tenantAndNamespaceName); + rgs.unRegisterTopic(topicName); // The overhead of a RG lookup mSecsStart = System.currentTimeMillis(); @@ -197,6 +200,7 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep final NamespaceName tenantAndNamespace = NamespaceName.get(tenantName, namespaceName); rgs.registerNameSpace(rgName, tenantAndNamespace); + rgs.registerTopic(rgName, topic); // Delete of our valid config should throw until we unref correspondingly. Assert.assertThrows(PulsarAdminException.class, () -> rgs.resourceGroupDelete(rgName)); @@ -231,6 +235,7 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep rgs.unRegisterTenant(rgName, tenantName); rgs.unRegisterNameSpace(rgName, tenantAndNamespace); + rgs.unRegisterTopic(topic); BytesAndMessagesCount publishQuota = rgs.getPublishRateLimiters(rgName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationOnTopicLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationOnTopicLevelTest.java new file mode 100644 index 0000000000000..f5f615709ca11 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationOnTopicLevelTest.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import static org.testng.Assert.assertNotNull; +import com.google.common.collect.Sets; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class ResourceGroupUsageAggregationOnTopicLevelTest extends ProducerConsumerBase { + + private final String TenantName = "pulsar-test"; + private final String NsName = "test"; + private final String TenantAndNsName = TenantName + "/" + NsName; + private final String TestProduceConsumeTopicName = "/test/prod-cons-topic"; + private final String PRODUCE_CONSUME_PERSISTENT_TOPIC = "persistent://" + TenantAndNsName + TestProduceConsumeTopicName; + private final String PRODUCE_CONSUME_NON_PERSISTENT_TOPIC = + "non-persistent://" + TenantAndNsName + TestProduceConsumeTopicName; + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + this.conf.setAllowAutoTopicCreation(true); + + final String clusterName = "test"; + admin.clusters().createCluster(clusterName, ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); + admin.tenants().createTenant(TenantName, + new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"), Sets.newHashSet(clusterName))); + admin.namespaces().createNamespace(TenantAndNsName); + admin.namespaces().setNamespaceReplicationClusters(TenantAndNsName, Sets.newHashSet(clusterName)); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testPersistentTopicProduceConsumeUsageOnRG() throws Exception { + testProduceConsumeUsageOnRG(PRODUCE_CONSUME_PERSISTENT_TOPIC); + } + + @Test + public void testNonPersistentTopicProduceConsumeUsageOnRG() throws Exception { + testProduceConsumeUsageOnRG(PRODUCE_CONSUME_NON_PERSISTENT_TOPIC); + } + + private void testProduceConsumeUsageOnRG(String topicString) throws Exception { + ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator() { + @Override + public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes, + long currentMessagesUsed, long lastReportedMessages, + long lastReportTimeMSecsSinceEpoch) { + return false; + } + + @Override + public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) { + return 0; + } + }; + + @Cleanup + ResourceUsageTopicTransportManager transportMgr = new ResourceUsageTopicTransportManager(pulsar); + @Cleanup + ResourceGroupService rgs = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, transportMgr, + dummyQuotaCalc); + + String activeRgName = "runProduceConsume"; + ResourceGroup activeRG; + + ResourceUsagePublisher ruP = new ResourceUsagePublisher() { + @Override + public String getID() { + return rgs.resourceGroupGet(activeRgName).resourceGroupName; + } + + @Override + public void fillResourceUsage(ResourceUsage resourceUsage) { + rgs.resourceGroupGet(activeRgName).rgFillResourceUsage(resourceUsage); + } + }; + + ResourceUsageConsumer ruC = new ResourceUsageConsumer() { + @Override + public String getID() { + return rgs.resourceGroupGet(activeRgName).resourceGroupName; + } + + @Override + public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { + rgs.resourceGroupGet(activeRgName).rgResourceUsageListener(broker, resourceUsage); + } + }; + + org.apache.pulsar.common.policies.data.ResourceGroup rgConfig = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + rgConfig.setPublishRateInBytes(1500L); + rgConfig.setPublishRateInMsgs(100); + rgConfig.setDispatchRateInBytes(4000L); + rgConfig.setPublishRateInMsgs(500); + + rgs.resourceGroupCreate(activeRgName, rgConfig, ruP, ruC); + + activeRG = rgs.resourceGroupGet(activeRgName); + assertNotNull(activeRG); + + String subscriptionName = "my-subscription"; + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicString) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicString) + .create(); + + TopicName myTopic = TopicName.get(topicString); + rgs.unRegisterTopic(myTopic); + rgs.registerTopic(activeRgName,myTopic); + + final int NumMessagesToSend = 10; + int sentNumBytes = 0; + int sentNumMsgs = 0; + for (int ix = 0; ix < NumMessagesToSend; ix++) { + byte[] mesg = String.format("Hi, ix=%s", ix).getBytes(); + producer.send(mesg); + sentNumBytes += mesg.length; + sentNumMsgs++; + } + + this.verifyStats(rgs, topicString, activeRgName, sentNumBytes, sentNumMsgs, 0, 0, + true, false); + + int recvdNumBytes = 0; + int recvdNumMsgs = 0; + + Message message; + while (recvdNumMsgs < sentNumMsgs) { + message = consumer.receive(); + recvdNumBytes += message.getValue().length; + recvdNumMsgs++; + } + + this.verifyStats(rgs,topicString, activeRgName, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, + true, true); + } + + private void verifyStats(ResourceGroupService rgs, String topicString, String rgName, + int sentNumBytes, int sentNumMsgs, + int recvdNumBytes, int recvdNumMsgs, + boolean checkProduce, boolean checkConsume) throws PulsarAdminException { + BrokerService bs = pulsar.getBrokerService(); + Awaitility.await().untilAsserted(() -> { + TopicStatsImpl topicStats = bs.getTopicStats().get(topicString); + assertNotNull(topicStats); + if (checkProduce) { + Assert.assertTrue(topicStats.bytesInCounter >= sentNumBytes); + Assert.assertEquals(sentNumMsgs, topicStats.msgInCounter); + } + if (checkConsume) { + Assert.assertTrue(topicStats.bytesOutCounter >= recvdNumBytes); + Assert.assertEquals(recvdNumMsgs, topicStats.msgOutCounter); + } + }); + if (sentNumMsgs > 0 || recvdNumMsgs > 0) { + rgs.aggregateResourceGroupLocalUsages(); + BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish, + ResourceGroupUsageStatsType.Cumulative); + BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch, + ResourceGroupUsageStatsType.Cumulative); + + if (checkProduce) { + Assert.assertTrue(prodCounts.bytes >= sentNumBytes); + Assert.assertEquals(sentNumMsgs, prodCounts.messages); + } + if (checkConsume) { + Assert.assertTrue(consCounts.bytes >= recvdNumBytes); + Assert.assertEquals(recvdNumMsgs, consCounts.messages); + } + } + } +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java index 1dfb79d7ba0b7..644406fb25def 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; +import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; @@ -1718,4 +1720,60 @@ SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean * @param topic The topic in whose policy should be removed */ CompletableFuture removeSchemaCompatibilityStrategyAsync(String topic); + + /** + * Get the ResourceGroup for a topic. + * + * @param topic Topic name + * @param applied True gets namespace level configuration if ResourceGroup does not exist on the topic. + * False gets topic level configuration. + * @return ResourceGroup + * @throws NotAuthorizedException Don't have admin permission + * @throws NotFoundException Topic does not exist + * @throws PulsarAdminException Unexpected error + */ + String getResourceGroup(String topic, boolean applied) throws PulsarAdminException; + + /** + * Get the ResourceGroup for a topic asynchronously. + * + * @param topic Topic name + */ + CompletableFuture getResourceGroupAsync(String topic, boolean applied); + + /** + * Set the ResourceGroup for a topic. + * + * @param topic Topic name + * @param resourceGroupName ResourceGroup name + * @throws NotAuthorizedException Don't have admin permission + * @throws NotFoundException Topic does not exist + * @throws PulsarAdminException Unexpected error + */ + void setResourceGroup(String topic, String resourceGroupName) throws PulsarAdminException; + + /** + * Set the ResourceGroup for a topic. + * + * @param topic Topic name + * @param resourceGroupName ResourceGroup name + */ + CompletableFuture setResourceGroupAsync(String topic, String resourceGroupName); + + /** + * Remove the ResourceGroup on a topic. + * + * @param topic Topic name + * @throws NotAuthorizedException Don't have admin permission + * @throws NotFoundException Topic does not exist + * @throws PulsarAdminException Unexpected error + */ + void removeResourceGroup(String topic) throws PulsarAdminException; + + /** + * Remove the ResourceGroup on a topic asynchronously. + * + * @param topic Topic name + */ + CompletableFuture removeResourceGroupAsync(String topic); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java index bd3003778076d..60b49507e33fb 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java @@ -1420,6 +1420,54 @@ public CompletableFuture removeSchemaCompatibilityStrategyAsync(String top return asyncDeleteRequest(path); } + @Override + public String getResourceGroup(String topic, boolean applied) throws PulsarAdminException { + return sync(() -> getResourceGroupAsync(topic, applied)); + } + + @Override + public CompletableFuture getResourceGroupAsync(String topic, boolean applied) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "resourceGroup"); + path = path.queryParam("applied", applied); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(String rgName) { + future.complete(rgName); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void setResourceGroup(String topic, String resourceGroupName) throws PulsarAdminException { + sync(() -> setResourceGroupAsync(topic, resourceGroupName)); + } + + @Override + public CompletableFuture setResourceGroupAsync(String topic, String resourceGroupName) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "resourceGroup"); + return asyncPostRequest(path, Entity.entity(resourceGroupName, MediaType.APPLICATION_JSON_TYPE)); + } + + @Override + public void removeResourceGroup(String topic) throws PulsarAdminException { + sync(() -> removeResourceGroupAsync(topic)); + } + + @Override + public CompletableFuture removeResourceGroupAsync(String topic) { + return setResourceGroupAsync(topic, null); + } + /* * returns topic name with encoded Local Name */ diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index 9bd6c18b4aec6..f718ba7877138 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -145,6 +145,10 @@ public CmdTopicPolicies(Supplier admin) { jcommander.addCommand("remove-schema-compatibility-strategy", new RemoveSchemaCompatibilityStrategy()); jcommander.addCommand("set-schema-compatibility-strategy", new SetSchemaCompatibilityStrategy()); jcommander.addCommand("get-schema-compatibility-strategy", new GetSchemaCompatibilityStrategy()); + + jcommander.addCommand("set-resource-group", new SetResourceGroup()); + jcommander.addCommand("get-resource-group", new GetResourceGroup()); + jcommander.addCommand("remove-resource-group", new RemoveResourceGroup()); } @Parameters(commandDescription = "Get max consumers per subscription for a topic") @@ -1742,6 +1746,61 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get ResourceGroup for a topic") + private class GetResourceGroup extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--applied", "-a"}, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + print(getTopicPolicies(isGlobal).getResourceGroup(topic, applied)); + } + } + + @Parameters(commandDescription = "Set ResourceGroup for a topic") + private class SetResourceGroup extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Parameter(names = {"--resource-group-name", "-rgn"}, description = "ResourceGroup name", required = true) + private String rgName; + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + getTopicPolicies(isGlobal).setResourceGroup(topic, rgName); + } + } + + @Parameters(commandDescription = "Remove ResourceGroup from a topic") + private class RemoveResourceGroup extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + + @Override + void run() throws PulsarAdminException { + String topic = validateTopicName(params); + getTopicPolicies(isGlobal).removeResourceGroup(topic); + } + } + private TopicPolicies getTopicPolicies(boolean isGlobal) { return getAdmin().topicPolicies(isGlobal); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index 0532744bec3e6..53b119c039c99 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -58,6 +58,8 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue schemaCompatibilityStrategy; final PolicyHierarchyValue dispatchRate; + final PolicyHierarchyValue resourceGroupName; + public HierarchyTopicPolicies() { replicationClusters = new PolicyHierarchyValue<>(); retentionPolicies = new PolicyHierarchyValue<>(); @@ -86,5 +88,6 @@ public HierarchyTopicPolicies() { subscriptionDispatchRate = new PolicyHierarchyValue<>(); schemaCompatibilityStrategy = new PolicyHierarchyValue<>(); dispatchRate = new PolicyHierarchyValue<>(); + resourceGroupName = new PolicyHierarchyValue<>(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 6e81509c83094..2ec4b18a1dbe1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -71,6 +71,7 @@ public class TopicPolicies { private Integer maxSubscriptionsPerTopic; private DispatchRateImpl replicatorDispatchRate; private SchemaCompatibilityStrategy schemaCompatibilityStrategy; + private String resourceGroupName; public boolean isGlobalPolicies() { return isGlobal != null && isGlobal; From 46556dfaf7220e9ec7149314a3a318a91f9de881 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 22 Feb 2024 01:53:17 +0800 Subject: [PATCH 2/5] [feat][broker] Add ResourceGroup-based dispatch rate limits to the Replicator Signed-off-by: Zixuan Liu --- .../broker/resourcegroup/ResourceGroup.java | 62 ++++++-- .../ResourceGroupDispatchLimiter.java | 150 ++++++++++++++++++ .../ResourceGroupRateLimiterManager.java | 47 ++++++ .../resourcegroup/ResourceGroupService.java | 31 +++- .../broker/service/AbstractReplicator.java | 3 + .../pulsar/broker/service/Replicator.java | 5 + .../persistent/PersistentReplicator.java | 70 ++++++-- .../src/main/proto/ResourceUsage.proto | 1 + .../ResourceGroupRateLimiterManagerTest.java | 109 +++++++++++++ .../ResourceGroupServiceTest.java | 11 ++ .../ResourceUsageTransportManagerTest.java | 8 +- .../service/ReplicatorRateLimiterTest.java | 92 +++++++++++ .../pulsar/client/admin/ResourceGroups.java | 4 + .../common/policies/data/ResourceGroup.java | 3 + .../pulsar/admin/cli/CmdResourceGroups.java | 20 +++ 15 files changed, 582 insertions(+), 34 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index d927b628e9f69..20e7b13590afb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -60,6 +60,7 @@ public static class BytesAndMessagesCount { public enum ResourceGroupMonitoringClass { Publish, Dispatch, + ReplicationDispatch, // Storage; // Punt this for now, until we have a clearer idea of the usage, statistics, etc. } @@ -69,7 +70,8 @@ public enum ResourceGroupMonitoringClass { public enum ResourceGroupRefTypes { Tenants, Namespaces, - Topics + Topics, + Replicators, } // Default ctor: it is not expected that anything outside of this package will need to directly @@ -84,6 +86,8 @@ protected ResourceGroup(ResourceGroupService rgs, String name, this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor()); log.info("attaching publish rate limiter {} to {} get {}", this.resourceGroupPublishLimiter.toString(), name, this.getResourceGroupPublishLimiter()); + this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager + .newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor()); } // ctor for overriding the transport-manager fill/set buffer. @@ -97,6 +101,8 @@ protected ResourceGroup(ResourceGroupService rgs, String rgName, this.setResourceGroupMonitoringClassFields(); this.setResourceGroupConfigParameters(rgConfig); this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor()); + this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager + .newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor()); this.ruPublisher = rgPublisher; this.ruConsumer = rgConsumer; } @@ -107,6 +113,7 @@ public ResourceGroup(ResourceGroup other) { this.resourceGroupName = other.resourceGroupName; this.rgs = other.rgs; this.resourceGroupPublishLimiter = other.resourceGroupPublishLimiter; + this.resourceGroupReplicationDispatchLimiter = other.resourceGroupReplicationDispatchLimiter; this.setResourceGroupMonitoringClassFields(); // ToDo: copy the monitoring class fields, and ruPublisher/ruConsumer from other, if required. @@ -146,6 +153,7 @@ protected void updateResourceGroup(org.apache.pulsar.common.policies.data.Resour pubBmc.messages = rgConfig.getPublishRateInMsgs(); pubBmc.bytes = rgConfig.getPublishRateInBytes(); this.resourceGroupPublishLimiter.update(pubBmc); + ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, rgConfig); } protected long getResourceGroupNumOfNSRefs() { @@ -230,6 +238,9 @@ public void rgFillResourceUsage(ResourceUsage resourceUsage) { p = resourceUsage.setDispatch(); this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p); + p = resourceUsage.setReplicationDispatch(); + this.setUsageInMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p); + // Punt storage for now. } @@ -243,6 +254,9 @@ public void rgResourceUsageListener(String broker, ResourceUsage resourceUsage) p = resourceUsage.getDispatch(); this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.Dispatch, p, broker); + p = resourceUsage.getReplicationDispatch(); + this.getUsageFromMonitoredEntity(ResourceGroupMonitoringClass.ReplicationDispatch, p, broker); + // Punt storage for now. } @@ -360,14 +374,6 @@ protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass, BytesAndMessagesCount newQuota) throws PulsarAdminException { - // Only the Publish side is functional now; add the Dispatch side code when the consume side is ready. - if (!ResourceGroupMonitoringClass.Publish.equals(monClass)) { - if (log.isDebugEnabled()) { - log.debug("Doing nothing for monClass={}; only Publish is functional", monClass); - } - return null; - } - this.checkMonitoringClass(monClass); BytesAndMessagesCount oldBMCount; @@ -376,7 +382,18 @@ protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass mo oldBMCount = monEntity.quotaForNextPeriod; try { monEntity.quotaForNextPeriod = newQuota; - this.resourceGroupPublishLimiter.update(newQuota); + switch (monClass) { + case ReplicationDispatch: + ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, newQuota); + break; + case Publish: + this.resourceGroupPublishLimiter.update(newQuota); + break; + default: + if (log.isDebugEnabled()) { + log.debug("Doing nothing for monClass={};", monClass); + } + } } finally { monEntity.localUsageStatsLock.unlock(); } @@ -428,9 +445,16 @@ protected static BytesAndMessagesCount accumulateBMCount(BytesAndMessagesCount . } private void checkMonitoringClass(ResourceGroupMonitoringClass monClass) throws PulsarAdminException { - if (monClass != ResourceGroupMonitoringClass.Publish && monClass != ResourceGroupMonitoringClass.Dispatch) { - String errMesg = "Unexpected monitoring class: " + monClass; - throw new PulsarAdminException(errMesg); + switch (monClass) { + case Publish: + break; + case Dispatch: + break; + case ReplicationDispatch: + break; + default: + String errMesg = "Unexpected monitoring class: " + monClass; + throw new PulsarAdminException(errMesg); } } @@ -575,6 +599,12 @@ private void setResourceGroupConfigParameters(org.apache.pulsar.common.policies. ? -1 : rgConfig.getDispatchRateInBytes(); this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getDispatchRateInMsgs() == null ? -1 : rgConfig.getDispatchRateInMsgs(); + + idx = ResourceGroupMonitoringClass.ReplicationDispatch.ordinal(); + this.monitoringClassFields[idx].configValuesPerPeriod.bytes = rgConfig.getReplicationDispatchRateInBytes() == null + ? -1 : rgConfig.getReplicationDispatchRateInBytes(); + this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getReplicationDispatchRateInMsgs() == null + ? -1 : rgConfig.getReplicationDispatchRateInMsgs(); } private void setDefaultResourceUsageTransportHandlers() { @@ -650,6 +680,12 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { @Getter protected ResourceGroupPublishLimiter resourceGroupPublishLimiter; + @Getter + protected ResourceGroupDispatchLimiter resourceGroupReplicationDispatchLimiter; + + @Getter + protected ResourceGroupDispatchLimiter resourceGroupTopicDispatchLimiter; + protected static class PerMonitoringClassFields { // This lock covers all the "local" counts (i.e., except for the per-broker usage stats). Lock localUsageStatsLock; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java new file mode 100644 index 0000000000000..104890fd8cd46 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.common.util.RateLimiter; + +public class ResourceGroupDispatchLimiter implements AutoCloseable { + + private final ScheduledExecutorService executorService; + private volatile RateLimiter dispatchRateLimiterOnMessage; + private volatile RateLimiter dispatchRateLimiterOnByte; + + public ResourceGroupDispatchLimiter(ScheduledExecutorService executorService, long dispatchRateInMsgs, long dispatchRateInBytes) { + this.executorService = executorService; + update(dispatchRateInMsgs, dispatchRateInBytes); + } + + public void update(long dispatchRateInMsgs, long dispatchRateInBytes) { + if (dispatchRateInMsgs > 0) { + if (dispatchRateLimiterOnMessage != null) { + this.dispatchRateLimiterOnMessage.setRate(dispatchRateInMsgs); + } else { + this.dispatchRateLimiterOnMessage = + RateLimiter.builder() + .scheduledExecutorService(executorService) + .permits(dispatchRateInMsgs) + .rateTime(1) + .timeUnit(TimeUnit.SECONDS) + .permitUpdater(null) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + } + } else { + if (this.dispatchRateLimiterOnMessage != null) { + this.dispatchRateLimiterOnMessage.close(); + this.dispatchRateLimiterOnMessage = null; + } + } + + if (dispatchRateInBytes > 0) { + if (dispatchRateLimiterOnByte != null) { + this.dispatchRateLimiterOnByte.setRate(dispatchRateInBytes); + } else { + this.dispatchRateLimiterOnByte = + RateLimiter.builder() + .scheduledExecutorService(executorService) + .permits(dispatchRateInBytes) + .rateTime(1) + .timeUnit(TimeUnit.SECONDS) + .permitUpdater(null) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + } + } else { + if (this.dispatchRateLimiterOnByte != null) { + this.dispatchRateLimiterOnByte.close(); + this.dispatchRateLimiterOnByte = null; + } + } + } + + /** + * returns available msg-permit if msg-dispatch-throttling is enabled else it returns -1. + * + * @return + */ + public long getAvailableDispatchRateLimitOnMsg() { + return dispatchRateLimiterOnMessage == null ? -1 : dispatchRateLimiterOnMessage.getAvailablePermits(); + } + + /** + * returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1. + * + * @return + */ + public long getAvailableDispatchRateLimitOnByte() { + return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits(); + } + + /** + * It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed. + * + * @param numberOfMessages + * @param byteSize + */ + public void consumeDispatchQuota(long numberOfMessages, long byteSize) { + if (numberOfMessages > 0 && dispatchRateLimiterOnMessage != null) { + dispatchRateLimiterOnMessage.tryAcquire(numberOfMessages); + } + if (byteSize > 0 && dispatchRateLimiterOnByte != null) { + dispatchRateLimiterOnByte.tryAcquire(byteSize); + } + } + + /** + * Checks if dispatch-rate limiting is enabled. + * + * @return + */ + public boolean isDispatchRateLimitingEnabled() { + return dispatchRateLimiterOnMessage != null || dispatchRateLimiterOnByte != null; + } + + public void close() { + if (dispatchRateLimiterOnMessage != null) { + dispatchRateLimiterOnMessage = null; + } + if (dispatchRateLimiterOnByte != null) { + dispatchRateLimiterOnByte = null; + } + } + + /** + * Get configured msg dispatch-throttling rate. Returns -1 if not configured + * + * @return + */ + public long getDispatchRateOnMsg() { + return dispatchRateLimiterOnMessage != null ? dispatchRateLimiterOnMessage.getRate() : -1; + } + + /** + * Get configured byte dispatch-throttling rate. Returns -1 if not configured + * + * @return + */ + public long getDispatchRateOnByte() { + return dispatchRateLimiterOnByte != null ? dispatchRateLimiterOnByte.getRate() : -1; + } + + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java new file mode 100644 index 0000000000000..e1bc7dafe25f6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; + +public class ResourceGroupRateLimiterManager { + + static ResourceGroupDispatchLimiter newReplicationDispatchRateLimiter( + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup, + ScheduledExecutorService executorService) { + long msgs = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInMsgs()).orElse(-1L); + long bytes = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInBytes()).orElse(-1L); + return new ResourceGroupDispatchLimiter(executorService, msgs, bytes); + } + + static void updateReplicationDispatchRateLimiter( + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter, + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup) { + long msgs = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInMsgs()).orElse(-1L); + long bytes = Optional.ofNullable(resourceGroup.getReplicationDispatchRateInBytes()).orElse(-1L); + resourceGroupDispatchLimiter.update(msgs, bytes); + } + + static void updateReplicationDispatchRateLimiter(ResourceGroupDispatchLimiter resourceGroupDispatchLimiter, + BytesAndMessagesCount quota) { + resourceGroupDispatchLimiter.update(quota.messages, quota.bytes); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index 9fcdb6790b30b..d642a9d96e2a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -494,8 +494,9 @@ private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdmin // Find the difference between the last time stats were updated for this topic, and the current // time. If the difference is positive, update the stats. - private void updateStatsWithDiff(String topicName, String tenantString, String nsString, - long accByteCount, long accMesgCount, ResourceGroupMonitoringClass monClass) { + @VisibleForTesting + protected void updateStatsWithDiff(String topicName, String replicationRemoteCluster, String tenantString, String nsString, + long accByteCount, long accMesgCount, ResourceGroupMonitoringClass monClass) { ConcurrentHashMap hm; switch (monClass) { default: @@ -509,6 +510,10 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n case Dispatch: hm = this.topicConsumeStats; break; + + case ReplicationDispatch: + hm = this.replicationDispatchStats; + break; } BytesAndMessagesCount bmDiff = new BytesAndMessagesCount(); @@ -518,7 +523,13 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n bmNewCount.bytes = accByteCount; bmNewCount.messages = accMesgCount; - bmOldCount = hm.get(topicName); + String key; + if (monClass == ResourceGroupMonitoringClass.ReplicationDispatch) { + key = topicName + replicationRemoteCluster; + } else { + key = topicName; + } + bmOldCount = hm.get(key); if (bmOldCount == null) { bmDiff.bytes = bmNewCount.bytes; bmDiff.messages = bmNewCount.messages; @@ -539,7 +550,7 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n topicName, monClass, statsUpdated, tenantString, nsString, bmDiff.bytes, bmDiff.messages); } - hm.put(topicName, bmNewCount); + hm.put(key, bmNewCount); } catch (Throwable t) { log.error("updateStatsWithDiff: got ex={} while aggregating for {} side", t.getMessage(), monClass); @@ -636,10 +647,17 @@ protected void aggregateResourceGroupLocalUsages() { continue; } - this.updateStatsWithDiff(topicName, tenantString, nsString, + topicStats.getReplication().forEach((remoteCluster, stats) -> { + this.updateStatsWithDiff(topicName, remoteCluster, tenantString, nsString, + (long) stats.getMsgThroughputOut(), + (long) stats.getMsgRateOut(), + ResourceGroupMonitoringClass.ReplicationDispatch + ); + }); + this.updateStatsWithDiff(topicName, null, tenantString, nsString, topicStats.getBytesInCounter(), topicStats.getMsgInCounter(), ResourceGroupMonitoringClass.Publish); - this.updateStatsWithDiff(topicName, tenantString, nsString, + this.updateStatsWithDiff(topicName, null, tenantString, nsString, topicStats.getBytesOutCounter(), topicStats.getMsgOutCounter(), ResourceGroupMonitoringClass.Dispatch); } @@ -824,6 +842,7 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie // Maps to maintain the usage per topic, in produce/consume directions. private ConcurrentHashMap topicProduceStats = new ConcurrentHashMap<>(); private ConcurrentHashMap topicConsumeStats = new ConcurrentHashMap<>(); + private ConcurrentHashMap replicationDispatchStats = new ConcurrentHashMap<>(); // The task that periodically re-calculates the quota budget for local usage. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 50408b50632e1..65de05e38175a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -45,6 +45,7 @@ public abstract class AbstractReplicator { protected final String remoteCluster; protected final PulsarClientImpl replicationClient; protected final PulsarClientImpl client; + protected String replicatorId; protected volatile ProducerImpl producer; public static final String REPL_PRODUCER_NAME_DELIMITER = "-->"; @@ -80,6 +81,8 @@ public AbstractReplicator(Topic localTopic, String replicatorPrefix, String loca this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); + this.replicatorId = String.format("%s | %s", topicName, localCluster + "-->" + remoteCluster); + this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) // .topic(topicName) .messageRoutingMode(MessageRoutingMode.SinglePartition) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index eea90efb88371..177d95b6805bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -20,6 +20,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; @@ -48,5 +49,9 @@ default Optional getRateLimiter() { return Optional.empty(); } + default Optional getResourceGroupDispatchRateLimiter() { + return Optional.empty(); + } + boolean isConnected(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 4de9dec171769..2d7a7cb0ba7e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -44,6 +44,9 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupService; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; @@ -57,6 +60,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.SendCallback; import org.apache.pulsar.common.api.proto.MarkerType; +import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.stats.Rate; @@ -71,6 +75,7 @@ public class PersistentReplicator extends AbstractReplicator protected final ManagedCursor cursor; private Optional dispatchRateLimiter = Optional.empty(); + protected Optional resourceGroupDispatchRateLimiter = Optional.empty(); private final Object dispatchRateLimiterLock = new Object(); private int readBatchSize; @@ -195,29 +200,53 @@ private int getAvailablePermits() { return 0; } + long availablePermitsOnMsg = -1; + // handle rate limit if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) { DispatchRateLimiter rateLimiter = dispatchRateLimiter.get(); - // no permits from rate limit - if (!rateLimiter.hasMessageDispatchPermit()) { + // if dispatch-rate is in msg then read only msg according to available permit + availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg(); + long availablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte(); + if (availablePermitsOnByte == 0 || availablePermitsOnMsg == 0) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] message-read exceeded topic replicator message-rate {}/{}," + log.debug("[{}] message-read exceeded topic replicator message-rate {}/{}," + " schedule after a {}", - topicName, localCluster, remoteCluster, + replicatorId, rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(), MESSAGE_RATE_BACKOFF_MS); } return -1; } + } - // if dispatch-rate is in msg then read only msg according to available permit - long availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg(); - if (availablePermitsOnMsg > 0) { - availablePermits = Math.min(availablePermits, (int) availablePermitsOnMsg); + if (resourceGroupDispatchRateLimiter.isPresent()) { + ResourceGroupDispatchLimiter rateLimiter = resourceGroupDispatchRateLimiter.get(); + long rgAvailablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg(); + long rgAvailablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte(); + if (rgAvailablePermitsOnMsg == 0 || rgAvailablePermitsOnByte == 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] message-read exceeded resourcegroup message-rate {}/{}," + + " schedule after a {}", + replicatorId, + rateLimiter.getDispatchRateOnMsg(), + rateLimiter.getDispatchRateOnByte(), + MESSAGE_RATE_BACKOFF_MS); + } + return -1; + } + if (availablePermitsOnMsg == -1) { + availablePermitsOnMsg = rgAvailablePermitsOnMsg; + } else { + availablePermitsOnMsg = Math.min(rgAvailablePermitsOnMsg, availablePermitsOnMsg); } } + if (availablePermitsOnMsg > 0) { + availablePermits = Math.min(availablePermits, (int) availablePermitsOnMsg); + } + return availablePermits; } @@ -751,6 +780,11 @@ public Optional getRateLimiter() { return dispatchRateLimiter; } + @Override + public Optional getResourceGroupDispatchRateLimiter() { + return resourceGroupDispatchRateLimiter; + } + @Override public void initializeDispatchRateLimiterIfNeeded() { synchronized (dispatchRateLimiterLock) { @@ -758,13 +792,21 @@ public void initializeDispatchRateLimiterIfNeeded() { && DispatchRateLimiter.isDispatchRateEnabled(topic.getReplicatorDispatchRate())) { this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.REPLICATOR)); } - } - } - @Override - public void updateRateLimiter() { - initializeDispatchRateLimiterIfNeeded(); - dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate); + ResourceGroupService resourceGroupService = brokerService.getPulsar().getResourceGroupServiceManager(); + HierarchyTopicPolicies hierarchyTopicPolicies = topic.getHierarchyTopicPolicies(); + String resourceGroupName = hierarchyTopicPolicies.getResourceGroupName().get(); + if (resourceGroupName != null) { + ResourceGroup resourceGroup = resourceGroupService.resourceGroupGet(resourceGroupName); + if (resourceGroup != null) { + resourceGroupDispatchRateLimiter = Optional.of(resourceGroup.getResourceGroupReplicationDispatchLimiter()); + } + } else { + if (resourceGroupDispatchRateLimiter.isPresent()) { + resourceGroupDispatchRateLimiter = Optional.empty(); + } + } + } } private void checkReplicatedSubscriptionMarker(Position position, MessageImpl msg, ByteBuf payload) { diff --git a/pulsar-broker/src/main/proto/ResourceUsage.proto b/pulsar-broker/src/main/proto/ResourceUsage.proto index 4706c9dfbcd19..ac7e1dea23457 100644 --- a/pulsar-broker/src/main/proto/ResourceUsage.proto +++ b/pulsar-broker/src/main/proto/ResourceUsage.proto @@ -38,6 +38,7 @@ message ResourceUsage { optional NetworkUsage publish = 2; optional NetworkUsage dispatch = 3; optional StorageUsage storage = 4; + optional NetworkUsage replicationDispatch = 6; } message ResourceUsageInfo { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java new file mode 100644 index 0000000000000..fc9862456d41a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import lombok.Cleanup; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; +import org.testng.annotations.Test; + +public class ResourceGroupRateLimiterManagerTest { + + @Test + public void testNewReplicationDispatchRateLimiterWithEmptyResourceGroup() { + org.apache.pulsar.common.policies.data.ResourceGroup emptyResourceGroup = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + + @Cleanup(value = "shutdown") + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + @Cleanup + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager.newReplicationDispatchRateLimiter(emptyResourceGroup,executorService); + assertFalse(resourceGroupDispatchLimiter.isDispatchRateLimitingEnabled()); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg(), -1L); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), -1L); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte(), -1L); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), -1L); + } + + @Test + public void testReplicationDispatchRateLimiterOnMsgs() { + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + resourceGroup.setReplicationDispatchRateInMsgs(10L); + @Cleanup(value = "shutdown") + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + @Cleanup + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager.newReplicationDispatchRateLimiter(resourceGroup,executorService); + assertTrue(resourceGroupDispatchLimiter.isDispatchRateLimitingEnabled()); + + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg(), resourceGroup.getReplicationDispatchRateInMsgs().longValue()); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), resourceGroup.getReplicationDispatchRateInMsgs().longValue()); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte(), -1L); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), -1L); + } + + @Test + public void testReplicationDispatchRateLimiterOnBytes() { + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + resourceGroup.setReplicationDispatchRateInBytes(20L); + @Cleanup(value = "shutdown") + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + @Cleanup + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager.newReplicationDispatchRateLimiter(resourceGroup,executorService); + assertTrue(resourceGroupDispatchLimiter.isDispatchRateLimitingEnabled()); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg(), -1L); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), -1L); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte(), resourceGroup.getReplicationDispatchRateInBytes().longValue()); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), resourceGroup.getReplicationDispatchRateInBytes().longValue()); + } + + @Test + public void testUpdateReplicationDispatchRateLimiter() { + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + resourceGroup.setReplicationDispatchRateInMsgs(10L); + resourceGroup.setReplicationDispatchRateInBytes(100L); + + @Cleanup(value = "shutdown") + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + @Cleanup + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager.newReplicationDispatchRateLimiter(resourceGroup,executorService); + + BytesAndMessagesCount quota = new BytesAndMessagesCount(); + quota.messages = 20; + quota.bytes = 200; + ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupDispatchLimiter, quota); + + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte(), quota.bytes); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), quota.bytes); + assertEquals(resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg(), quota.messages); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), quota.messages); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index cb408bcbc0cf1..5d4bce643e6df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -156,6 +156,8 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep rgConfig.setPublishRateInMsgs(100); rgConfig.setDispatchRateInBytes(40000L); rgConfig.setDispatchRateInMsgs(500); + rgConfig.setReplicationDispatchRateInBytes(2000L); + rgConfig.setReplicationDispatchRateInMsgs(400L); int initialNumQuotaCalculations = numAnonymousQuotaCalculations; rgs.resourceGroupCreate(rgName, rgConfig); @@ -170,6 +172,8 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep rgConfig.setPublishRateInMsgs(rgConfig.getPublishRateInMsgs()*10); rgConfig.setDispatchRateInBytes(rgConfig.getDispatchRateInBytes()/10); rgConfig.setDispatchRateInMsgs(rgConfig.getDispatchRateInMsgs()/10); + rgConfig.setReplicationDispatchRateInBytes(rgConfig.getReplicationDispatchRateInBytes()/10); + rgConfig.setReplicationDispatchRateInMsgs(rgConfig.getReplicationDispatchRateInMsgs()/10); rgs.resourceGroupUpdate(rgName, rgConfig); Assert.assertEquals(rgs.getNumResourceGroups(), 1); @@ -187,6 +191,9 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep monClassFields = retRG.monitoringClassFields[ResourceGroupMonitoringClass.Dispatch.ordinal()]; Assert.assertEquals(monClassFields.configValuesPerPeriod.bytes, rgConfig.getDispatchRateInBytes().longValue()); Assert.assertEquals(monClassFields.configValuesPerPeriod.messages, rgConfig.getDispatchRateInMsgs().intValue()); + monClassFields = retRG.monitoringClassFields[ResourceGroupMonitoringClass.ReplicationDispatch.ordinal()]; + Assert.assertEquals(monClassFields.configValuesPerPeriod.bytes, rgConfig.getReplicationDispatchRateInBytes().longValue()); + Assert.assertEquals(monClassFields.configValuesPerPeriod.messages, rgConfig.getReplicationDispatchRateInMsgs().intValue()); Assert.assertThrows(PulsarAdminException.class, () -> rgs.resourceGroupDelete(randomRgName)); @@ -217,6 +224,10 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep // Gross hack! if (monClass == ResourceGroupMonitoringClass.Publish) { nwUsage = usage.setPublish(); + } else if (monClass == ResourceGroupMonitoringClass.Dispatch) { + nwUsage = usage.setDispatch(); + } else if (monClass == ResourceGroupMonitoringClass.ReplicationDispatch) { + nwUsage = usage.setReplicationDispatch(); } else { nwUsage = usage.setDispatch(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java index 4e54d1b3326ca..bcc663342baa8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java @@ -79,7 +79,7 @@ public void fillResourceUsage(ResourceUsage resourceUsage) { resourceUsage.setOwner(getID()); resourceUsage.setPublish().setMessagesPerPeriod(1000).setBytesPerPeriod(10001); resourceUsage.setStorage().setTotalBytes(500003); - + resourceUsage.setReplicationDispatch().setMessagesPerPeriod(2000).setBytesPerPeriod(4000); } }; @@ -98,6 +98,10 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { p.setBytesPerPeriod(resourceUsage.getPublish().getBytesPerPeriod()); p.setMessagesPerPeriod(resourceUsage.getPublish().getMessagesPerPeriod()); + p = recvdUsage.setReplicationDispatch(); + p.setBytesPerPeriod(resourceUsage.getReplicationDispatch().getBytesPerPeriod()); + p.setMessagesPerPeriod(resourceUsage.getReplicationDispatch().getMessagesPerPeriod()); + recvdUsage.setStorage().setTotalBytes(resourceUsage.getStorage().getTotalBytes()); } }; @@ -112,6 +116,8 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { assertNotNull(recvdUsage.getStorage()); assertEquals(recvdUsage.getPublish().getBytesPerPeriod(), 10001); assertEquals(recvdUsage.getStorage().getTotalBytes(), 500003); + assertEquals(recvdUsage.getReplicationDispatch().getBytesPerPeriod(), 4000); + assertEquals(recvdUsage.getReplicationDispatch().getMessagesPerPeriod(), 2000); } private void prepareData() throws PulsarServerException, PulsarAdminException, PulsarClientException { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index fdf27adc71875..5b7ea15409757 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -19,20 +19,24 @@ package org.apache.pulsar.broker.service; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertFalse; import com.google.common.collect.Sets; import java.lang.reflect.Method; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.ResourceGroup; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +102,7 @@ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception { // rate limiter disable by default assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertFalse(topic.getReplicators().values().get(0).getResourceGroupDispatchRateLimiter().isPresent()); //set topic-level policy, which should take effect DispatchRate topicRate = DispatchRate.builder() @@ -119,6 +124,24 @@ public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception { assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1); assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), -1L); + + // ResourceGroupDispatchRateLimiter + String resourceGroupName = UUID.randomUUID().toString(); + ResourceGroup resourceGroup = new ResourceGroup(); + resourceGroup.setReplicationDispatchRateInBytes(10L); + resourceGroup.setReplicationDispatchRateInMsgs(20L); + admin1.resourcegroups().createResourceGroup(resourceGroupName, resourceGroup); + Awaitility.await().untilAsserted(() -> assertNotNull(admin1.resourcegroups() + .getResourceGroup(resourceGroupName))); + admin1.topicPolicies().setResourceGroup(topicName, resourceGroupName); + + Replicator replicator = topic.getReplicators().values().get(0); + Awaitility.await().untilAsserted(() -> { + assertTrue(replicator.getResourceGroupDispatchRateLimiter().isPresent()); + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = replicator.getResourceGroupDispatchRateLimiter().get(); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), resourceGroup.getReplicationDispatchRateInBytes().longValue()); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), resourceGroup.getReplicationDispatchRateInMsgs().longValue()); + }); } @Test @@ -142,6 +165,7 @@ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception { // rate limiter disable by default assertFalse(topic.getReplicators().values().get(0).getRateLimiter().isPresent()); + assertFalse(topic.getReplicators().values().get(0).getResourceGroupDispatchRateLimiter().isPresent()); //set namespace-level policy, which should take effect DispatchRate topicRate = DispatchRate.builder() @@ -163,6 +187,24 @@ public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception { assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), -1); assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), -1L); + + // ResourceGroupDispatchRateLimiter + String resourceGroupName = UUID.randomUUID().toString(); + ResourceGroup resourceGroup = new ResourceGroup(); + resourceGroup.setReplicationDispatchRateInBytes(10L); + resourceGroup.setReplicationDispatchRateInMsgs(20L); + admin1.resourcegroups().createResourceGroup(resourceGroupName, resourceGroup); + Awaitility.await().untilAsserted(() -> assertNotNull(admin1.resourcegroups() + .getResourceGroup(resourceGroupName))); + admin1.namespaces().setNamespaceResourceGroup(namespace, resourceGroupName); + + Replicator replicator = topic.getReplicators().values().get(0); + Awaitility.await().untilAsserted(() -> { + assertTrue(replicator.getResourceGroupDispatchRateLimiter().isPresent()); + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = replicator.getResourceGroupDispatchRateLimiter().get(); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), resourceGroup.getReplicationDispatchRateInBytes().longValue()); + assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), resourceGroup.getReplicationDispatchRateInMsgs().longValue()); + }); } @Test @@ -547,5 +589,55 @@ public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Excepti producer.close(); } + @Test + public void testResourceGroupReplicatorRateLimiter() throws Exception { + final String namespace = "pulsar/replicatormsg-" + System.currentTimeMillis(); + final String topicName = "persistent://" + namespace + "/" + UUID.randomUUID(); + + admin1.namespaces().createNamespace(namespace); + // 0. set 2 clusters, there will be 1 replicator in each topic + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + // ResourceGroupDispatchRateLimiter + int messageRate = 100; + String resourceGroupName = UUID.randomUUID().toString(); + ResourceGroup resourceGroup = new ResourceGroup(); + resourceGroup.setReplicationDispatchRateInMsgs((long) messageRate); + admin1.resourcegroups().createResourceGroup(resourceGroupName, resourceGroup); + Awaitility.await().untilAsserted(() -> assertNotNull(admin1.resourcegroups() + .getResourceGroup(resourceGroupName))); + admin1.namespaces().setNamespaceResourceGroup(namespace, resourceGroupName); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + @Cleanup + Producer producer = client1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + final AtomicInteger totalReceived = new AtomicInteger(0); + + @Cleanup + Consumer consumer = client2.newConsumer().topic(topicName).subscriptionName("sub2-in-cluster2").messageListener((c1, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + totalReceived.incrementAndGet(); + }).subscribe(); + + int numMessages = 500; + for (int i = 0; i < numMessages; i++) { + producer.send(new byte[80]); + } + + Assert.assertTrue(totalReceived.get() < messageRate * 2); + } + private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java index bbed3dbb9b825..b023e9329301b 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java @@ -75,6 +75,8 @@ public interface ResourceGroups { * "PublishRateInBytes" : "value", * "DispatchRateInMsgs" : "value", * "DispatchRateInBytes" : "value" + * "ReplicationDispatchRateInMsgs" : "value" + * "ReplicationDispatchRateInBytes" : "value" * * * @@ -101,6 +103,8 @@ public interface ResourceGroups { * "PublishRateInBytes" : "value", * "DispatchRateInMsgs" : "value", * "DspatchRateInBytes" : "value" + * "ReplicationDispatchRateInMsgs" : "value" + * "ReplicationDispatchRateInBytes" : "value" * * * diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java index 0bfc38637c10f..3cef555f1981e 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java @@ -26,4 +26,7 @@ public class ResourceGroup { private Long publishRateInBytes; private Integer dispatchRateInMsgs; private Long dispatchRateInBytes; + + private Long replicationDispatchRateInMsgs; + private Long replicationDispatchRateInBytes; } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java index afc9fc1c3aba7..3d44e30ee4293 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java @@ -73,6 +73,14 @@ private class Create extends CliCommand { + "(default -1 will be overwrite if not passed)", required = false) private Long dispatchRateInBytes; + @Parameter(names = {"--replication-msg-dispatch-rate"}, description = "replication-msg-dispatch-rate " + + "(default -1 will be overwrite if not passed)", required = false) + private Long replicationDispatchRateInMsgs; + + @Parameter(names = {"--replication-byte-dispatch-rate"}, description = "replication-byte-dispatch-rate " + + "(default -1 will be overwrite if not passed)", required = false) + private Long replicationDispatchRateInBytes; + @Override void run() throws PulsarAdminException { String name = getOneArgument(params); @@ -82,6 +90,8 @@ void run() throws PulsarAdminException { resourcegroup.setDispatchRateInBytes(dispatchRateInBytes); resourcegroup.setPublishRateInMsgs(publishRateInMsgs); resourcegroup.setPublishRateInBytes(publishRateInBytes); + resourcegroup.setReplicationDispatchRateInMsgs(replicationDispatchRateInMsgs); + resourcegroup.setReplicationDispatchRateInBytes(replicationDispatchRateInBytes); getAdmin().resourcegroups().createResourceGroup(name, resourcegroup); } } @@ -108,6 +118,14 @@ private class Update extends CliCommand { "-bd" }, description = "byte-dispatch-rate ", required = false) private Long dispatchRateInBytes; + @Parameter(names = {"--replication-msg-dispatch-rate"}, description = "replication-msg-dispatch-rate " + + "(default -1 will be overwrite if not passed)", required = false) + private Long replicationDispatchRateInMsgs; + + @Parameter(names = {"--replication-byte-dispatch-rate"}, description = "replication-byte-dispatch-rate " + + "(default -1 will be overwrite if not passed)", required = false) + private Long replicationDispatchRateInBytes; + @Override void run() throws PulsarAdminException { String name = getOneArgument(params); @@ -117,6 +135,8 @@ void run() throws PulsarAdminException { resourcegroup.setDispatchRateInBytes(dispatchRateInBytes); resourcegroup.setPublishRateInMsgs(publishRateInMsgs); resourcegroup.setPublishRateInBytes(publishRateInBytes); + resourcegroup.setReplicationDispatchRateInMsgs(replicationDispatchRateInMsgs); + resourcegroup.setReplicationDispatchRateInBytes(replicationDispatchRateInBytes); getAdmin().resourcegroups().updateResourceGroup(name, resourcegroup); } From 1c4091351e44c44023b6a7c573c45f6accd76df0 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 5 Mar 2024 20:28:58 +0800 Subject: [PATCH 3/5] [fix][broker] Consume the ResourceGroup dispatch quota Signed-off-by: Zixuan Liu --- .../broker/service/persistent/PersistentReplicator.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 2d7a7cb0ba7e4..85211cd0cf7df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -397,6 +397,8 @@ public void readEntriesComplete(List entries, Object ctx) { } dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength())); + resourceGroupDispatchRateLimiter.ifPresent( + rateLimiter -> rateLimiter.consumeDispatchQuota(1, entry.getLength())); msgOut.recordEvent(headersAndPayload.readableBytes()); @@ -799,7 +801,8 @@ public void initializeDispatchRateLimiterIfNeeded() { if (resourceGroupName != null) { ResourceGroup resourceGroup = resourceGroupService.resourceGroupGet(resourceGroupName); if (resourceGroup != null) { - resourceGroupDispatchRateLimiter = Optional.of(resourceGroup.getResourceGroupReplicationDispatchLimiter()); + resourceGroupDispatchRateLimiter = Optional.of(resourceGroup + .getResourceGroupReplicationDispatchLimiter()); } } else { if (resourceGroupDispatchRateLimiter.isPresent()) { From 80f227c0baa91554cdf6733ab65aa4361b8ab86b Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 5 Mar 2024 20:30:36 +0800 Subject: [PATCH 4/5] [feat][broker] Add ResourceGroup-based dispatch rate limits to the Topic Signed-off-by: Zixuan Liu --- .../broker/resourcegroup/ResourceGroup.java | 23 +++++++++++---- .../ResourceGroupDispatchLimiter.java | 5 ++-- .../ResourceGroupRateLimiterManager.java | 22 +++++++++++++- .../resourcegroup/ResourceGroupService.java | 5 ++-- .../pulsar/broker/service/AbstractTopic.java | 6 ++++ .../apache/pulsar/broker/service/Topic.java | 5 ++++ ...PersistentDispatcherMultipleConsumers.java | 28 ++++++++++++++++++ ...sistentDispatcherSingleActiveConsumer.java | 29 +++++++++++++++++++ ...tStickyKeyDispatcherMultipleConsumers.java | 8 +++++ .../broker/admin/AdminResourceGroupTest.java | 2 +- .../ResourceGroupRateLimiterManagerTest.java | 2 +- 11 files changed, 123 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index 20e7b13590afb..41f10f45b2109 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -88,6 +88,8 @@ protected ResourceGroup(ResourceGroupService rgs, String name, this.getResourceGroupPublishLimiter()); this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager .newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor()); + this.resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager + .newDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor()); } // ctor for overriding the transport-manager fill/set buffer. @@ -103,6 +105,8 @@ protected ResourceGroup(ResourceGroupService rgs, String rgName, this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor()); this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager .newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor()); + this.resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager + .newDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor()); this.ruPublisher = rgPublisher; this.ruConsumer = rgConsumer; } @@ -114,6 +118,7 @@ public ResourceGroup(ResourceGroup other) { this.rgs = other.rgs; this.resourceGroupPublishLimiter = other.resourceGroupPublishLimiter; this.resourceGroupReplicationDispatchLimiter = other.resourceGroupReplicationDispatchLimiter; + this.resourceGroupDispatchLimiter = other.resourceGroupDispatchLimiter; this.setResourceGroupMonitoringClassFields(); // ToDo: copy the monitoring class fields, and ruPublisher/ruConsumer from other, if required. @@ -153,7 +158,9 @@ protected void updateResourceGroup(org.apache.pulsar.common.policies.data.Resour pubBmc.messages = rgConfig.getPublishRateInMsgs(); pubBmc.bytes = rgConfig.getPublishRateInBytes(); this.resourceGroupPublishLimiter.update(pubBmc); - ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, rgConfig); + ResourceGroupRateLimiterManager + .updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, rgConfig); + ResourceGroupRateLimiterManager.updateDispatchRateLimiter(resourceGroupDispatchLimiter, rgConfig); } protected long getResourceGroupNumOfNSRefs() { @@ -384,11 +391,15 @@ protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass mo monEntity.quotaForNextPeriod = newQuota; switch (monClass) { case ReplicationDispatch: - ResourceGroupRateLimiterManager.updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, newQuota); + ResourceGroupRateLimiterManager + .updateReplicationDispatchRateLimiter(resourceGroupReplicationDispatchLimiter, newQuota); break; case Publish: this.resourceGroupPublishLimiter.update(newQuota); break; + case Dispatch: + ResourceGroupRateLimiterManager.updateDispatchRateLimiter(resourceGroupDispatchLimiter, newQuota); + break; default: if (log.isDebugEnabled()) { log.debug("Doing nothing for monClass={};", monClass); @@ -601,9 +612,11 @@ private void setResourceGroupConfigParameters(org.apache.pulsar.common.policies. ? -1 : rgConfig.getDispatchRateInMsgs(); idx = ResourceGroupMonitoringClass.ReplicationDispatch.ordinal(); - this.monitoringClassFields[idx].configValuesPerPeriod.bytes = rgConfig.getReplicationDispatchRateInBytes() == null + this.monitoringClassFields[idx] + .configValuesPerPeriod.bytes = rgConfig.getReplicationDispatchRateInBytes() == null ? -1 : rgConfig.getReplicationDispatchRateInBytes(); - this.monitoringClassFields[idx].configValuesPerPeriod.messages = rgConfig.getReplicationDispatchRateInMsgs() == null + this.monitoringClassFields[idx] + .configValuesPerPeriod.messages = rgConfig.getReplicationDispatchRateInMsgs() == null ? -1 : rgConfig.getReplicationDispatchRateInMsgs(); } @@ -684,7 +697,7 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { protected ResourceGroupDispatchLimiter resourceGroupReplicationDispatchLimiter; @Getter - protected ResourceGroupDispatchLimiter resourceGroupTopicDispatchLimiter; + protected ResourceGroupDispatchLimiter resourceGroupDispatchLimiter; protected static class PerMonitoringClassFields { // This lock covers all the "local" counts (i.e., except for the per-broker usage stats). diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java index 104890fd8cd46..47063f7d6cd95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupDispatchLimiter.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,7 +28,8 @@ public class ResourceGroupDispatchLimiter implements AutoCloseable { private volatile RateLimiter dispatchRateLimiterOnMessage; private volatile RateLimiter dispatchRateLimiterOnByte; - public ResourceGroupDispatchLimiter(ScheduledExecutorService executorService, long dispatchRateInMsgs, long dispatchRateInBytes) { + public ResourceGroupDispatchLimiter(ScheduledExecutorService executorService, + long dispatchRateInMsgs, long dispatchRateInBytes) { this.executorService = executorService; update(dispatchRateInMsgs, dispatchRateInBytes); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java index e1bc7dafe25f6..0cb7e4dcf792d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManager.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -44,4 +44,24 @@ static void updateReplicationDispatchRateLimiter(ResourceGroupDispatchLimiter re BytesAndMessagesCount quota) { resourceGroupDispatchLimiter.update(quota.messages, quota.bytes); } + + static ResourceGroupDispatchLimiter newDispatchRateLimiter( + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup, + ScheduledExecutorService executorService) { + long msgs = Optional.ofNullable(resourceGroup.getDispatchRateInMsgs()).orElse(-1); + long bytes = Optional.ofNullable(resourceGroup.getDispatchRateInBytes()).orElse(-1L); + return new ResourceGroupDispatchLimiter(executorService, msgs, bytes); + } + + static void updateDispatchRateLimiter(ResourceGroupDispatchLimiter resourceGroupDispatchLimiter, + org.apache.pulsar.common.policies.data.ResourceGroup resourceGroup) { + long msgs = Optional.ofNullable(resourceGroup.getDispatchRateInMsgs()).orElse(-1); + long bytes = Optional.ofNullable(resourceGroup.getDispatchRateInBytes()).orElse(-1L); + resourceGroupDispatchLimiter.update(msgs, bytes); + } + + static void updateDispatchRateLimiter(ResourceGroupDispatchLimiter resourceGroupDispatchLimiter, + BytesAndMessagesCount quota) { + resourceGroupDispatchLimiter.update(quota.messages, quota.bytes); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index d642a9d96e2a8..3f55a9356503c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -495,8 +495,9 @@ private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdmin // Find the difference between the last time stats were updated for this topic, and the current // time. If the difference is positive, update the stats. @VisibleForTesting - protected void updateStatsWithDiff(String topicName, String replicationRemoteCluster, String tenantString, String nsString, - long accByteCount, long accMesgCount, ResourceGroupMonitoringClass monClass) { + protected void updateStatsWithDiff(String topicName, String replicationRemoteCluster, String tenantString, + String nsString, long accByteCount, long accMesgCount, + ResourceGroupMonitoringClass monClass) { ConcurrentHashMap hm; switch (monClass) { default: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 5c6657d4a7a0d..1cbed4ac7f39f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -49,6 +49,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resourcegroup.ResourceGroup; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter; import org.apache.pulsar.broker.resourcegroup.ResourceGroupService; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; @@ -120,6 +121,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener resourceGroupDispatchRateLimiter = Optional.empty(); + protected boolean preciseTopicPublishRateLimitingEnable; @Getter @@ -1153,6 +1157,7 @@ public void updateResourceGroupLimiter() { this.resourceGroupRateLimitingEnabled = true; this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter(); this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead); + this.resourceGroupDispatchRateLimiter = Optional.of(resourceGroup.getResourceGroupDispatchLimiter()); log.info("Using resource group {} rate limiter for topic {}", rgName, topic); } } else { @@ -1166,6 +1171,7 @@ public void updateResourceGroupLimiter() { protected void closeResourceGroupLimiter() { if (resourceGroupRateLimitingEnabled) { this.resourceGroupPublishLimiter = null; + this.resourceGroupDispatchRateLimiter = Optional.empty(); this.resourceGroupRateLimitingEnabled = false; brokerService.getPulsar().getResourceGroupServiceManager().unRegisterTopic(TopicName.get(topic)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index e2ffb41390a7e..9ad3b5cdce1d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; @@ -286,6 +287,10 @@ default Optional getDispatchRateLimiter() { return Optional.empty(); } + default Optional getResourceGroupDispatchRateLimiter() { + return Optional.empty(); + } + default Optional getSubscribeRateLimiter() { return Optional.empty(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 91a155a91574d..2580652856d97 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -42,6 +42,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; @@ -398,6 +399,27 @@ protected Pair calculateToRead(int currentTotalAvailablePermits) bytesToRead = calculateToRead.getRight(); } } + + if (topic.getResourceGroupDispatchRateLimiter().isPresent()) { + ResourceGroupDispatchLimiter limiter = topic.getResourceGroupDispatchRateLimiter().get(); + long availableDispatchRateLimitOnMsg = limiter.getAvailableDispatchRateLimitOnMsg(); + long availableDispatchRateLimitOnByte = limiter.getAvailableDispatchRateLimitOnByte(); + if (availableDispatchRateLimitOnMsg == 0 || availableDispatchRateLimitOnByte == 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] message-read exceeded resourcegroup message-rate {}/{}, schedule after a {}", + name, limiter.getDispatchRateOnMsg(), limiter.getDispatchRateOnByte(), + MESSAGE_RATE_BACKOFF_MS); + } + reScheduleRead(); + return Pair.of(-1, -1L); + } else { + Pair calculateToRead = + computeReadLimits(messagesToRead, (int) availableDispatchRateLimitOnMsg, bytesToRead, + availableDispatchRateLimitOnByte); + messagesToRead = calculateToRead.getLeft(); + bytesToRead = calculateToRead.getRight(); + } + } } if (havePendingReplayRead) { @@ -658,6 +680,12 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis if (dispatchRateLimiter.isPresent()) { dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent); } + + Optional resourceGroupDispatchRateLimiter = + topic.getResourceGroupDispatchRateLimiter(); + if (resourceGroupDispatchRateLimiter.isPresent()) { + resourceGroupDispatchRateLimiter.get().consumeDispatchQuota(permits, totalBytesSent); + } } if (entriesToDispatch > 0) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 446a19de47593..5d384def81004 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -39,6 +39,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; @@ -237,6 +238,10 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List e dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(permits, sendMessageInfo.getTotalBytes())); + topic.getResourceGroupDispatchRateLimiter().ifPresent(resourceGroupDispatchLimiter -> + resourceGroupDispatchLimiter.consumeDispatchQuota(permits, + sendMessageInfo.getTotalBytes()) + ); } // Schedule a new read batch operation only after the previous batch has been written to the socket. @@ -469,6 +474,30 @@ protected Pair calculateToRead(Consumer consumer) { bytesToRead = calculateToRead.getRight(); } } + + if (topic.getResourceGroupDispatchRateLimiter().isPresent()) { + ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = topic.getResourceGroupDispatchRateLimiter() + .get(); + long availableDispatchRateLimitOnMsg = + resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg(); + long availableDispatchRateLimitOnByte = + resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte(); + if (availableDispatchRateLimitOnMsg == 0 || availableDispatchRateLimitOnByte == 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] message-read exceeded resourcegroup message-rate {}/{}, schedule after a {}", + name, availableDispatchRateLimitOnMsg, availableDispatchRateLimitOnByte, + MESSAGE_RATE_BACKOFF_MS); + } + reScheduleRead(); + return Pair.of(-1, -1L); + } else { + Pair calculateToRead = + computeReadLimits(messagesToRead, (int) availableDispatchRateLimitOnMsg, bytesToRead, + availableDispatchRateLimitOnByte); + messagesToRead = calculateToRead.getLeft(); + bytesToRead = calculateToRead.getRight(); + } + } } // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 2cd05fe00457a..6e3065848ab19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -38,6 +39,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Consumer; @@ -313,6 +315,12 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis if (dispatchRateLimiter.isPresent()) { dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent); } + + Optional resourceGroupDispatchRateLimiter = + topic.getResourceGroupDispatchRateLimiter(); + if (resourceGroupDispatchRateLimiter.isPresent()) { + resourceGroupDispatchRateLimiter.get().consumeDispatchQuota(permits, totalBytesSent); + } } if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || recentlyJoinedConsumers.isEmpty())) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java index 7c539e13044f8..5b7019f69233b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceGroupTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java index fc9862456d41a..4ed8846caa24b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterManagerTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information From e40b1714cbfd93182553cfb40220a56e8273f61b Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 11 Mar 2024 16:43:46 +0800 Subject: [PATCH 5/5] Address comments Signed-off-by: Zixuan Liu --- .../pulsar/broker/resourcegroup/ResourceGroup.java | 7 +++++-- .../broker/resourcegroup/ResourceGroupService.java | 10 +++++----- .../broker/resourcegroup/ResourceGroupServiceTest.java | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index 41f10f45b2109..036d7e22dbcb7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -70,8 +70,7 @@ public enum ResourceGroupMonitoringClass { public enum ResourceGroupRefTypes { Tenants, Namespaces, - Topics, - Replicators, + Topics } // Default ctor: it is not expected that anything outside of this package will need to directly @@ -88,8 +87,11 @@ protected ResourceGroup(ResourceGroupService rgs, String name, this.getResourceGroupPublishLimiter()); this.resourceGroupReplicationDispatchLimiter = ResourceGroupRateLimiterManager .newReplicationDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor()); + log.info("attaching replication dispatch rate limiter {} to {}", this.resourceGroupReplicationDispatchLimiter, + name); this.resourceGroupDispatchLimiter = ResourceGroupRateLimiterManager .newDispatchRateLimiter(rgConfig, rgs.getPulsar().getExecutor()); + log.info("attaching topic dispatch rate limiter {} to {}", this.resourceGroupDispatchLimiter, name); } // ctor for overriding the transport-manager fill/set buffer. @@ -190,6 +192,7 @@ protected ResourceGroupOpStatus registerUsage(String name, ResourceGroupRefTypes break; case Topics: set = this.resourceGroupTopicRefs; + break; } if (ref) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index 3f55a9356503c..a531a73c3b4a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -386,14 +386,14 @@ private void incrementUsage(ResourceGroup resourceGroup, * ToDo: will this distinction result in "expected semantics", or shock from users? * For now, the only caller is internal to this class. * - * @param topicName Complete topic name * @param tenantName * @param nsName Complete namespace name + * @param topicName Complete topic name * @param monClass * @param incStats * @returns true if the stats were updated; false if nothing was updated. */ - protected boolean incrementUsage(String topicName, String tenantName, String nsName, + protected boolean incrementUsage(String tenantName, String nsName, String topicName, ResourceGroupMonitoringClass monClass, BytesAndMessagesCount incStats) throws PulsarAdminException { final ResourceGroup nsRG = this.namespaceToRGsMap.get(NamespaceName.get(nsName)); @@ -496,7 +496,7 @@ private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdmin // time. If the difference is positive, update the stats. @VisibleForTesting protected void updateStatsWithDiff(String topicName, String replicationRemoteCluster, String tenantString, - String nsString, long accByteCount, long accMesgCount, + String nsString, long accByteCount, long accMsgCount, ResourceGroupMonitoringClass monClass) { ConcurrentHashMap hm; switch (monClass) { @@ -522,7 +522,7 @@ protected void updateStatsWithDiff(String topicName, String replicationRemoteClu BytesAndMessagesCount bmNewCount = new BytesAndMessagesCount(); bmNewCount.bytes = accByteCount; - bmNewCount.messages = accMesgCount; + bmNewCount.messages = accMsgCount; String key; if (monClass == ResourceGroupMonitoringClass.ReplicationDispatch) { @@ -544,7 +544,7 @@ protected void updateStatsWithDiff(String topicName, String replicationRemoteClu } try { - boolean statsUpdated = this.incrementUsage(topicName, tenantString, nsString, monClass, bmDiff); + boolean statsUpdated = this.incrementUsage(tenantString, nsString, topicName, monClass, bmDiff); if (log.isDebugEnabled()) { log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; " + "by {} bytes, {} mesgs", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index 5d4bce643e6df..75d8306d48d04 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -122,7 +122,7 @@ public void measureOpsTime() throws PulsarAdminException { for (int ix = 0; ix < numPerfTestIterations; ix++) { for (int monClassIdx = 0; monClassIdx < ResourceGroupMonitoringClass.values().length; monClassIdx++) { monClass = ResourceGroupMonitoringClass.values()[monClassIdx]; - rgs.incrementUsage(topicName.toString(), tenantName, namespaceName, monClass, stats); + rgs.incrementUsage(tenantName, namespaceName, topicName.toString(), monClass, stats); } } mSecsEnd = System.currentTimeMillis();