Skip to content

Commit

Permalink
[fix][broker] Clean stats when resource is closed (#23)
Browse files Browse the repository at this point in the history
* [fix][broker] Clean stats when resource is closed

Signed-off-by: Zixuan Liu <[email protected]>

* Fix test

Signed-off-by: Zixuan Liu <[email protected]>

---------

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece authored Aug 20, 2024
1 parent 87009ee commit faac751
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.val;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -293,6 +296,36 @@ public void unRegisterNameSpace(String resourceGroupName, NamespaceName fqNamesp
throw new PulsarAdminException(errMesg);
}

aggregateLock.lock();

Set<String> invalidateAllKeyForProduce = new HashSet<>();
topicProduceStats.asMap().forEach((key, value) -> {
TopicName topicName = TopicName.get(key);
if (topicName.getNamespaceObject().equals(fqNamespaceName)) {
invalidateAllKeyForProduce.add(key);
}
});
topicProduceStats.invalidateAll(invalidateAllKeyForProduce);

Set<String> invalidateAllKeyForReplication = new HashSet<>();
topicToReplicatorsMap.forEach((key, value) -> {
TopicName topicName = TopicName.get(key);
if (topicName.getNamespaceObject().equals(fqNamespaceName)) {
value.forEach(n -> invalidateAllKeyForReplication.add(key));
}
});
replicationDispatchStats.invalidateAll(invalidateAllKeyForReplication);

Set<TopicName> invalidateAllKeyForConsumer = new HashSet<>();
topicConsumeStats.asMap().forEach((key, value) -> {
TopicName topicName = TopicName.get(key);
if (topicName.getNamespaceObject().equals(fqNamespaceName)) {
invalidateAllKeyForConsumer.add(topicName);
}
});
topicConsumeStats.invalidate(invalidateAllKeyForConsumer);

aggregateLock.unlock();
// Dissociate this NS-name from the RG.
this.namespaceToRGsMap.remove(fqNamespaceName, rg);
rgNamespaceUnRegisters.labels(resourceGroupName).inc();
Expand Down Expand Up @@ -326,15 +359,24 @@ public void registerTopic(String resourceGroupName, TopicName topicName) {
/**
* UnRegisters a topic from a resource group.
*
* @param topicName complete topic name
* @param topicName complete topic name
*/
public void unRegisterTopic(TopicName topicName) {
ResourceGroup remove = this.topicToRGsMap.remove(topicName);
aggregateLock.lock();
String topicNameString = topicName.toString();
ResourceGroup remove = topicToRGsMap.remove(topicName);
if (remove != null) {
remove.registerUsage(topicName.toString(), ResourceGroupRefTypes.Topics,
remove.registerUsage(topicNameString, ResourceGroupRefTypes.Topics,
false, this.resourceUsageTransportManagerMgr);
rgTopicUnRegisters.labels(remove.resourceGroupName).inc();
topicProduceStats.invalidate(topicNameString);
topicConsumeStats.invalidate(topicNameString);
Set<String> replicators = topicToReplicatorsMap.remove(topicNameString);
if (replicators != null) {
replicationDispatchStats.invalidateAll(replicators);
}
}
aggregateLock.unlock();
}

/**
Expand Down Expand Up @@ -496,6 +538,10 @@ private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdmin
return rg;
}

private String getReplicatorKey(String topic, String replicationRemoteCluster) {
return topic + replicationRemoteCluster;
}

// Find the difference between the last time stats were updated for this topic, and the current
// time. If the difference is positive, update the stats.
@VisibleForTesting
Expand Down Expand Up @@ -530,7 +576,14 @@ protected void updateStatsWithDiff(String topicName, String replicationRemoteClu

String key;
if (monClass == ResourceGroupMonitoringClass.ReplicationDispatch) {
key = topicName + replicationRemoteCluster;
key = getReplicatorKey(topicName, replicationRemoteCluster);
topicToReplicatorsMap.compute(key, (n, value) -> {
if (value == null) {
value = new CopyOnWriteArraySet<>();
}
value.add(replicationRemoteCluster);
return value;
});
} else {
key = topicName;
}
Expand Down Expand Up @@ -644,6 +697,7 @@ protected void aggregateResourceGroupLocalUsages() {
BrokerService bs = this.pulsar.getBrokerService();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();

aggregateLock.lock();
for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) {
final String topicName = entry.getKey();
final TopicStats topicStats = entry.getValue();
Expand Down Expand Up @@ -676,6 +730,7 @@ protected void aggregateResourceGroupLocalUsages() {
topicStats.getBytesOutCounter(), topicStats.getMsgOutCounter(),
ResourceGroupMonitoringClass.Dispatch);
}
aggregateLock.unlock();
double diffTimeSeconds = aggrUsageTimer.observeDuration();
if (log.isDebugEnabled()) {
log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000);
Expand Down Expand Up @@ -869,6 +924,8 @@ protected Cache<String, BytesAndMessagesCount> newStatsCache(long durationMS) {
// Given a qualified NS-name (i.e., in "tenant/namespace" format), record its associated resource-group
private ConcurrentHashMap<NamespaceName, ResourceGroup> namespaceToRGsMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<TopicName, ResourceGroup> topicToRGsMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Set<String>> topicToReplicatorsMap = new ConcurrentHashMap<>();
private ReentrantLock aggregateLock = new ReentrantLock();

// Maps to maintain the usage per topic, in produce/consume directions.
private Cache<String, BytesAndMessagesCount> topicProduceStats;
Expand Down Expand Up @@ -990,6 +1047,11 @@ Cache<String, BytesAndMessagesCount> getReplicationDispatchStats() {
return this.replicationDispatchStats;
}

@VisibleForTesting
ConcurrentHashMap<String, Set<String>> getTopicToReplicatorsMap() {
return this.topicToReplicatorsMap;
}

@VisibleForTesting
ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() {
return this.aggregateLocalUsagePeriodicTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,10 @@ protected void closeResourceGroupLimiter() {
this.resourceGroupPublishLimiter = null;
this.resourceGroupDispatchRateLimiter = Optional.empty();
this.resourceGroupRateLimitingEnabled = false;
brokerService.getPulsar().getResourceGroupServiceManager().unRegisterTopic(TopicName.get(topic));
}
ResourceGroupService resourceGroupServiceManager = brokerService.getPulsar().getResourceGroupServiceManager();
if (resourceGroupServiceManager != null) {
resourceGroupServiceManager.unRegisterTopic(TopicName.get(topic));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Policy.Expiration;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerBrokerUsageStats;
Expand All @@ -33,9 +35,12 @@
import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -277,6 +282,57 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep
Assert.assertThrows(PulsarAdminException.class, () -> rgs.getPublishRateLimiters(rgName));

Assert.assertEquals(rgs.getNumResourceGroups(), 0);

Assert.assertEquals(rgs.getTopicConsumeStats().estimatedSize(), 0);
Assert.assertEquals(rgs.getTopicProduceStats().estimatedSize(), 0);
Assert.assertEquals(rgs.getReplicationDispatchStats().estimatedSize(), 0);
Assert.assertEquals(rgs.getTopicToReplicatorsMap().size(), 0);
}

@Test
public void testCleanupStatsWhenNamespaceDeleted()
throws PulsarAdminException, PulsarClientException, InterruptedException {
String tenantName = UUID.randomUUID().toString();
admin.tenants().createTenant(tenantName,
TenantInfo.builder().allowedClusters(new HashSet<>(admin.clusters().getClusters())).build());
String nsName = tenantName + "/" + UUID.randomUUID();
admin.namespaces().createNamespace(nsName);
org.apache.pulsar.common.policies.data.ResourceGroup rgConfig =
new org.apache.pulsar.common.policies.data.ResourceGroup();
final String rgName = UUID.randomUUID().toString();
rgConfig.setPublishRateInBytes(15000L);
rgConfig.setPublishRateInMsgs(100);
rgConfig.setDispatchRateInBytes(40000L);
rgConfig.setDispatchRateInMsgs(500);
rgConfig.setReplicationDispatchRateInBytes(2000L);
rgConfig.setReplicationDispatchRateInMsgs(400L);

admin.resourcegroups().createResourceGroup(rgName, rgConfig);
admin.namespaces().setNamespaceResourceGroup(nsName, rgName);
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
Assert.assertNotNull(rgs.getNamespaceResourceGroup(NamespaceName.get(nsName)));
});

String topic = nsName + "/" + UUID.randomUUID();
@Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
producer.send("hi".getBytes(StandardCharsets.UTF_8));

rgs.aggregateResourceGroupLocalUsages();
producer.close();
Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 1);
Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 0);
Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 0);
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(nsName);
admin.resourcegroups().deleteResourceGroup(rgName);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 0);
Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 0);
Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 0);
Assert.assertNull(rgs.getNamespaceResourceGroup(NamespaceName.get(nsName)));
});
}

@Test
Expand Down

0 comments on commit faac751

Please sign in to comment.