From faac7517b7eb10f038c2af0f42ce28f9b9d942f4 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 20 Aug 2024 09:32:27 +0800 Subject: [PATCH] [fix][broker] Clean stats when resource is closed (#23) * [fix][broker] Clean stats when resource is closed Signed-off-by: Zixuan Liu * Fix test Signed-off-by: Zixuan Liu --------- Signed-off-by: Zixuan Liu --- .../resourcegroup/ResourceGroupService.java | 70 +++++++++++++++++-- .../pulsar/broker/service/AbstractTopic.java | 5 +- .../ResourceGroupServiceTest.java | 56 +++++++++++++++ 3 files changed, 126 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index 740183e3659d4..19ef50e303d00 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -25,11 +25,14 @@ import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import io.prometheus.client.Summary; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import lombok.Getter; import lombok.val; import org.apache.pulsar.broker.PulsarService; @@ -293,6 +296,36 @@ public void unRegisterNameSpace(String resourceGroupName, NamespaceName fqNamesp throw new PulsarAdminException(errMesg); } + aggregateLock.lock(); + + Set invalidateAllKeyForProduce = new HashSet<>(); + topicProduceStats.asMap().forEach((key, value) -> { + TopicName topicName = TopicName.get(key); + if (topicName.getNamespaceObject().equals(fqNamespaceName)) { + invalidateAllKeyForProduce.add(key); + } + }); + topicProduceStats.invalidateAll(invalidateAllKeyForProduce); + + Set invalidateAllKeyForReplication = new HashSet<>(); + topicToReplicatorsMap.forEach((key, value) -> { + TopicName topicName = TopicName.get(key); + if (topicName.getNamespaceObject().equals(fqNamespaceName)) { + value.forEach(n -> invalidateAllKeyForReplication.add(key)); + } + }); + replicationDispatchStats.invalidateAll(invalidateAllKeyForReplication); + + Set invalidateAllKeyForConsumer = new HashSet<>(); + topicConsumeStats.asMap().forEach((key, value) -> { + TopicName topicName = TopicName.get(key); + if (topicName.getNamespaceObject().equals(fqNamespaceName)) { + invalidateAllKeyForConsumer.add(topicName); + } + }); + topicConsumeStats.invalidate(invalidateAllKeyForConsumer); + + aggregateLock.unlock(); // Dissociate this NS-name from the RG. this.namespaceToRGsMap.remove(fqNamespaceName, rg); rgNamespaceUnRegisters.labels(resourceGroupName).inc(); @@ -326,15 +359,24 @@ public void registerTopic(String resourceGroupName, TopicName topicName) { /** * UnRegisters a topic from a resource group. * - * @param topicName complete topic name + * @param topicName complete topic name */ public void unRegisterTopic(TopicName topicName) { - ResourceGroup remove = this.topicToRGsMap.remove(topicName); + aggregateLock.lock(); + String topicNameString = topicName.toString(); + ResourceGroup remove = topicToRGsMap.remove(topicName); if (remove != null) { - remove.registerUsage(topicName.toString(), ResourceGroupRefTypes.Topics, + remove.registerUsage(topicNameString, ResourceGroupRefTypes.Topics, false, this.resourceUsageTransportManagerMgr); rgTopicUnRegisters.labels(remove.resourceGroupName).inc(); + topicProduceStats.invalidate(topicNameString); + topicConsumeStats.invalidate(topicNameString); + Set replicators = topicToReplicatorsMap.remove(topicNameString); + if (replicators != null) { + replicationDispatchStats.invalidateAll(replicators); + } } + aggregateLock.unlock(); } /** @@ -496,6 +538,10 @@ private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdmin return rg; } + private String getReplicatorKey(String topic, String replicationRemoteCluster) { + return topic + replicationRemoteCluster; + } + // Find the difference between the last time stats were updated for this topic, and the current // time. If the difference is positive, update the stats. @VisibleForTesting @@ -530,7 +576,14 @@ protected void updateStatsWithDiff(String topicName, String replicationRemoteClu String key; if (monClass == ResourceGroupMonitoringClass.ReplicationDispatch) { - key = topicName + replicationRemoteCluster; + key = getReplicatorKey(topicName, replicationRemoteCluster); + topicToReplicatorsMap.compute(key, (n, value) -> { + if (value == null) { + value = new CopyOnWriteArraySet<>(); + } + value.add(replicationRemoteCluster); + return value; + }); } else { key = topicName; } @@ -644,6 +697,7 @@ protected void aggregateResourceGroupLocalUsages() { BrokerService bs = this.pulsar.getBrokerService(); Map topicStatsMap = bs.getTopicStats(); + aggregateLock.lock(); for (Map.Entry entry : topicStatsMap.entrySet()) { final String topicName = entry.getKey(); final TopicStats topicStats = entry.getValue(); @@ -676,6 +730,7 @@ protected void aggregateResourceGroupLocalUsages() { topicStats.getBytesOutCounter(), topicStats.getMsgOutCounter(), ResourceGroupMonitoringClass.Dispatch); } + aggregateLock.unlock(); double diffTimeSeconds = aggrUsageTimer.observeDuration(); if (log.isDebugEnabled()) { log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000); @@ -869,6 +924,8 @@ protected Cache newStatsCache(long durationMS) { // Given a qualified NS-name (i.e., in "tenant/namespace" format), record its associated resource-group private ConcurrentHashMap namespaceToRGsMap = new ConcurrentHashMap<>(); private ConcurrentHashMap topicToRGsMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap> topicToReplicatorsMap = new ConcurrentHashMap<>(); + private ReentrantLock aggregateLock = new ReentrantLock(); // Maps to maintain the usage per topic, in produce/consume directions. private Cache topicProduceStats; @@ -990,6 +1047,11 @@ Cache getReplicationDispatchStats() { return this.replicationDispatchStats; } + @VisibleForTesting + ConcurrentHashMap> getTopicToReplicatorsMap() { + return this.topicToReplicatorsMap; + } + @VisibleForTesting ScheduledFuture getAggregateLocalUsagePeriodicTask() { return this.aggregateLocalUsagePeriodicTask; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 1cbed4ac7f39f..9b20a57506c98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1173,7 +1173,10 @@ protected void closeResourceGroupLimiter() { this.resourceGroupPublishLimiter = null; this.resourceGroupDispatchRateLimiter = Optional.empty(); this.resourceGroupRateLimitingEnabled = false; - brokerService.getPulsar().getResourceGroupServiceManager().unRegisterTopic(TopicName.get(topic)); + } + ResourceGroupService resourceGroupServiceManager = brokerService.getPulsar().getResourceGroupServiceManager(); + if (resourceGroupServiceManager != null) { + resourceGroupServiceManager.unRegisterTopic(TopicName.get(topic)); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index 4c260507ebc9c..fe2d7146fdf93 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -20,11 +20,13 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Policy.Expiration; +import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerBrokerUsageStats; @@ -33,9 +35,12 @@ import org.apache.pulsar.broker.service.resource.usage.NetworkUsage; import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -277,6 +282,57 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep Assert.assertThrows(PulsarAdminException.class, () -> rgs.getPublishRateLimiters(rgName)); Assert.assertEquals(rgs.getNumResourceGroups(), 0); + + Assert.assertEquals(rgs.getTopicConsumeStats().estimatedSize(), 0); + Assert.assertEquals(rgs.getTopicProduceStats().estimatedSize(), 0); + Assert.assertEquals(rgs.getReplicationDispatchStats().estimatedSize(), 0); + Assert.assertEquals(rgs.getTopicToReplicatorsMap().size(), 0); + } + + @Test + public void testCleanupStatsWhenNamespaceDeleted() + throws PulsarAdminException, PulsarClientException, InterruptedException { + String tenantName = UUID.randomUUID().toString(); + admin.tenants().createTenant(tenantName, + TenantInfo.builder().allowedClusters(new HashSet<>(admin.clusters().getClusters())).build()); + String nsName = tenantName + "/" + UUID.randomUUID(); + admin.namespaces().createNamespace(nsName); + org.apache.pulsar.common.policies.data.ResourceGroup rgConfig = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + final String rgName = UUID.randomUUID().toString(); + rgConfig.setPublishRateInBytes(15000L); + rgConfig.setPublishRateInMsgs(100); + rgConfig.setDispatchRateInBytes(40000L); + rgConfig.setDispatchRateInMsgs(500); + rgConfig.setReplicationDispatchRateInBytes(2000L); + rgConfig.setReplicationDispatchRateInMsgs(400L); + + admin.resourcegroups().createResourceGroup(rgName, rgConfig); + admin.namespaces().setNamespaceResourceGroup(nsName, rgName); + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + Assert.assertNotNull(rgs.getNamespaceResourceGroup(NamespaceName.get(nsName))); + }); + + String topic = nsName + "/" + UUID.randomUUID(); + @Cleanup + Producer producer = + pulsarClient.newProducer().topic(topic).create(); + producer.send("hi".getBytes(StandardCharsets.UTF_8)); + + rgs.aggregateResourceGroupLocalUsages(); + producer.close(); + Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 1); + Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 0); + Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 0); + admin.topics().delete(topic); + admin.namespaces().deleteNamespace(nsName); + admin.resourcegroups().deleteResourceGroup(rgName); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 0); + Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 0); + Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 0); + Assert.assertNull(rgs.getNamespaceResourceGroup(NamespaceName.get(nsName))); + }); } @Test