Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Improve ResourceGroup #14

Merged
merged 3 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function broker_group_1() {
}

function broker_group_2() {
$MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,websocket,other' -DtestReuseFork=false
$MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,websocket,other' -DtestReuseFork=false -DfailIfNoTests=false
}

function broker_group_3() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,33 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int resourceUsageTransportPublishIntervalInSecs = 60;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "The percentage difference that is considered \"within limits\" to suppress usage reporting"
+ "Setting this to 0 will also make us report in every round."
)
private int resourceUsageReportSuppressionTolerancePercentage = 5;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "The maximum number of successive rounds that we can suppress reporting local usage, because there "
+ "was no substantial change from the prior round. This is to ensure the reporting does not "
+ "become too chatty. Set this value to one more than the cadence of sending reports; e.g., if "
+ "you want to send every 3rd report, set the value to 4."
+ "Setting this to 0 will make us report in every round."
+ "Don't set to negative values; behavior will be disabled"
)
private int resourceUsageMaxUsageReportSuppressRounds = 5;

@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "ResourceGroup rate limit will be triggered when the total traffic exceeds the product of the "
+ "rate-limit value and the threshold. Value range: [0,1]."
)
private double resourceGroupLocalQuotaThreshold = 0;

// <-- dispatcher read settings -->
@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.ToString;
import lombok.val;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupOpStatus;
import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
Expand All @@ -49,6 +53,7 @@ public class ResourceGroup {
/**
* Convenience class for bytes and messages counts, which are used together in a lot of the following code.
*/
@ToString
public static class BytesAndMessagesCount {
public long bytes;
public long messages;
Expand Down Expand Up @@ -356,7 +361,7 @@ protected BytesAndMessagesCount getLocalUsageStatsFromBrokerReports(ResourceGrou

monEntity.usageFromOtherBrokersLock.lock();
try {
pbus = monEntity.usageFromOtherBrokers.get(myBrokerId);
pbus = monEntity.usageFromOtherBrokers.getIfPresent(myBrokerId);
} finally {
monEntity.usageFromOtherBrokersLock.unlock();
}
Expand All @@ -380,7 +385,7 @@ protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass
monEntity.usageFromOtherBrokersLock.lock();
BytesAndMessagesCount retStats = new BytesAndMessagesCount();
try {
monEntity.usageFromOtherBrokers.forEach((broker, brokerUsage) -> {
monEntity.usageFromOtherBrokers.asMap().forEach((broker, brokerUsage) -> {
retStats.bytes += brokerUsage.usedValues.bytes;
retStats.messages += brokerUsage.usedValues.messages;
});
Expand Down Expand Up @@ -512,8 +517,6 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas
monEntity.lastReportedValues.bytes = bytesUsed;
monEntity.lastReportedValues.messages = messagesUsed;
monEntity.numSuppressedUsageReports = 0;
monEntity.totalUsedLocally.bytes += bytesUsed;
monEntity.totalUsedLocally.messages += messagesUsed;
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
} else {
numSuppressions = monEntity.numSuppressedUsageReports++;
Expand Down Expand Up @@ -552,7 +555,7 @@ private void getUsageFromMonitoredEntity(ResourceGroupMonitoringClass monClass,
long newByteCount, newMessageCount;

monEntity = this.monitoringClassFields[idx];
usageStats = monEntity.usageFromOtherBrokers.get(broker);
usageStats = monEntity.usageFromOtherBrokers.getIfPresent(broker);
if (usageStats == null) {
usageStats = new PerBrokerUsageStats();
usageStats.usedValues = new BytesAndMessagesCount();
Expand All @@ -564,7 +567,8 @@ private void getUsageFromMonitoredEntity(ResourceGroupMonitoringClass monClass,
newMessageCount = p.getMessagesPerPeriod();
usageStats.usedValues.messages = newMessageCount;
usageStats.lastResourceUsageReadTimeMSecsSinceEpoch = System.currentTimeMillis();
oldUsageStats = monEntity.usageFromOtherBrokers.put(broker, usageStats);
oldUsageStats = monEntity.usageFromOtherBrokers.getIfPresent(broker);
monEntity.usageFromOtherBrokers.put(broker, usageStats);
} finally {
monEntity.usageFromOtherBrokersLock.unlock();
}
Expand All @@ -587,21 +591,18 @@ private void getUsageFromMonitoredEntity(ResourceGroupMonitoringClass monClass,
}

private void setResourceGroupMonitoringClassFields() {
PerMonitoringClassFields monClassFields;
ServiceConfiguration conf = rgs.getPulsar().getConfiguration();
long resourceUsageTransportPublishIntervalInSecs = conf.getResourceUsageTransportPublishIntervalInSecs();
int maxUsageReportSuppressRounds = Math.max(conf.getResourceUsageMaxUsageReportSuppressRounds(), 1);
long cacheInMS = TimeUnit.SECONDS.toMillis(
resourceUsageTransportPublishIntervalInSecs * maxUsageReportSuppressRounds * 2);
// Usage report data is cached to the memory, when the broker is restart or offline, we need an elimination
// strategy to release the quota occupied by other broker.
//
// Considering that each broker starts at a different time, the cache time should be equal to the mandatory
// reporting period * 2.
for (int idx = 0; idx < ResourceGroupMonitoringClass.values().length; idx++) {
this.monitoringClassFields[idx] = new PerMonitoringClassFields();

monClassFields = this.monitoringClassFields[idx];
monClassFields.configValuesPerPeriod = new BytesAndMessagesCount();
monClassFields.usedLocallySinceLastReport = new BytesAndMessagesCount();
monClassFields.lastReportedValues = new BytesAndMessagesCount();
monClassFields.quotaForNextPeriod = new BytesAndMessagesCount();
monClassFields.totalUsedLocally = new BytesAndMessagesCount();
monClassFields.usageFromOtherBrokers = new HashMap<>();

monClassFields.usageFromOtherBrokersLock = new ReentrantLock();
// ToDo: Change the following to a ReadWrite lock if needed.
monClassFields.localUsageStatsLock = new ReentrantLock();
this.monitoringClassFields[idx] = PerMonitoringClassFields.create(cacheInMS);
}
}

Expand Down Expand Up @@ -737,11 +738,33 @@ protected static class PerMonitoringClassFields {
int numSuppressedUsageReports;

// Accumulated stats of local usage.
@VisibleForTesting
BytesAndMessagesCount totalUsedLocally;

// This lock covers all the non-local usage counts, received from other brokers.
Lock usageFromOtherBrokersLock;
public HashMap<String, PerBrokerUsageStats> usageFromOtherBrokers;
public Cache<String, PerBrokerUsageStats> usageFromOtherBrokers;

private PerMonitoringClassFields(){

}

static PerMonitoringClassFields create(long durationMs) {
PerMonitoringClassFields perMonitoringClassFields = new PerMonitoringClassFields();
perMonitoringClassFields.configValuesPerPeriod = new BytesAndMessagesCount();
perMonitoringClassFields.usedLocallySinceLastReport = new BytesAndMessagesCount();
perMonitoringClassFields.lastReportedValues = new BytesAndMessagesCount();
perMonitoringClassFields.quotaForNextPeriod = new BytesAndMessagesCount();
perMonitoringClassFields.totalUsedLocally = new BytesAndMessagesCount();
perMonitoringClassFields.usageFromOtherBrokersLock = new ReentrantLock();
// ToDo: Change the following to a ReadWrite lock if needed.
perMonitoringClassFields.localUsageStatsLock = new ReentrantLock();

perMonitoringClassFields.usageFromOtherBrokers = Caffeine.newBuilder()
.expireAfterWrite(durationMs, TimeUnit.MILLISECONDS)
.build();
return perMonitoringClassFields;
}
}

// Usage stats for this RG obtained from other brokers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,27 @@ public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
}
}

/**
* It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed.
*
* @param numberOfMessages
* @param byteSize
*/
public boolean tryAcquire(long numberOfMessages, long byteSize) {
if (numberOfMessages > 0 && dispatchRateLimiterOnMessage != null) {
if (!dispatchRateLimiterOnMessage.tryAcquire(numberOfMessages)) {
return false;
}
}
if (byteSize > 0 && dispatchRateLimiterOnByte != null) {
if (!dispatchRateLimiterOnByte.tryAcquire(byteSize)) {
return false;
}
}

return true;
}

/**
* Checks if dispatch-rate limiting is enabled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.resourcegroup;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
Expand Down Expand Up @@ -60,7 +62,7 @@ public class ResourceGroupService implements AutoCloseable{
public ResourceGroupService(PulsarService pulsar) {
this.pulsar = pulsar;
this.timeUnitScale = TimeUnit.SECONDS;
this.quotaCalculator = new ResourceQuotaCalculatorImpl();
this.quotaCalculator = new ResourceQuotaCalculatorImpl(pulsar);
this.resourceUsageTransportManagerMgr = pulsar.getResourceUsageTransportManager();
this.rgConfigListener = new ResourceGroupConfigListener(this, pulsar);
this.initialize();
Expand Down Expand Up @@ -360,8 +362,9 @@ public void close() throws Exception {
resourceGroupsMap.clear();
tenantToRGsMap.clear();
namespaceToRGsMap.clear();
topicProduceStats.clear();
topicConsumeStats.clear();
topicProduceStats.invalidateAll();
topicConsumeStats.invalidateAll();
replicationDispatchStats.invalidateAll();
}

private void incrementUsage(ResourceGroup resourceGroup,
Expand Down Expand Up @@ -498,7 +501,7 @@ private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdmin
protected void updateStatsWithDiff(String topicName, String replicationRemoteCluster, String tenantString,
String nsString, long accByteCount, long accMsgCount,
ResourceGroupMonitoringClass monClass) {
ConcurrentHashMap<String, BytesAndMessagesCount> hm;
Cache<String, BytesAndMessagesCount> hm;
switch (monClass) {
default:
log.error("updateStatsWithDiff: Unknown monitoring class={}; ignoring", monClass);
Expand Down Expand Up @@ -530,7 +533,7 @@ protected void updateStatsWithDiff(String topicName, String replicationRemoteClu
} else {
key = topicName;
}
bmOldCount = hm.get(key);
bmOldCount = hm.getIfPresent(key);
if (bmOldCount == null) {
bmDiff.bytes = bmNewCount.bytes;
bmDiff.messages = bmNewCount.messages;
Expand Down Expand Up @@ -650,8 +653,8 @@ protected void aggregateResourceGroupLocalUsages() {

topicStats.getReplication().forEach((remoteCluster, stats) -> {
this.updateStatsWithDiff(topicName, remoteCluster, tenantString, nsString,
(long) stats.getMsgThroughputOut(),
(long) stats.getMsgRateOut(),
stats.getBytesOutCounter(),
stats.getMsgOutCounter(),
ResourceGroupMonitoringClass.ReplicationDispatch
);
});
Expand Down Expand Up @@ -779,8 +782,6 @@ protected void calculateQuotaForAllResourceGroups() {
newPeriodInSeconds,
timeUnitScale);
this.resourceUsagePublishPeriodInSeconds = newPeriodInSeconds;
maxIntervalForSuppressingReportsMSecs =
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
}
}

Expand All @@ -798,9 +799,11 @@ private void initialize() {
periodInSecs,
periodInSecs,
this.timeUnitScale);
maxIntervalForSuppressingReportsMSecs =
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;

long resourceUsagePublishPeriodInMS = TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds);
long statsCacheInMS = resourceUsagePublishPeriodInMS * 2;
topicProduceStats = newStatsCache(statsCacheInMS);
topicConsumeStats = newStatsCache(statsCacheInMS);
replicationDispatchStats = newStatsCache(statsCacheInMS);
}

private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
Expand All @@ -819,6 +822,13 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
}
}

@VisibleForTesting
protected Cache<String, BytesAndMessagesCount> newStatsCache(long durationMS) {
return Caffeine.newBuilder()
.expireAfterAccess(durationMS, TimeUnit.MILLISECONDS)
.build();
}

private static final Logger log = LoggerFactory.getLogger(ResourceGroupService.class);

@Getter
Expand All @@ -841,10 +851,9 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
private ConcurrentHashMap<TopicName, ResourceGroup> topicToRGsMap = new ConcurrentHashMap<>();

// Maps to maintain the usage per topic, in produce/consume directions.
private ConcurrentHashMap<String, BytesAndMessagesCount> topicProduceStats = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, BytesAndMessagesCount> topicConsumeStats = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, BytesAndMessagesCount> replicationDispatchStats = new ConcurrentHashMap<>();

private Cache<String, BytesAndMessagesCount> topicProduceStats;
private Cache<String, BytesAndMessagesCount> topicConsumeStats;
private Cache<String, BytesAndMessagesCount> replicationDispatchStats;

// The task that periodically re-calculates the quota budget for local usage.
private ScheduledFuture<?> aggregateLocalUsagePeriodicTask;
Expand All @@ -857,22 +866,6 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
// Allow a pluggable scale on time units; for testing periodic functionality.
private TimeUnit timeUnitScale;

// The maximum number of successive rounds that we can suppress reporting local usage, because there was no
// substantial change from the prior round. This is to ensure the reporting does not become too chatty.
// Set this value to one more than the cadence of sending reports; e.g., if you want to send every 3rd report,
// set the value to 4.
// Setting this to 0 will make us report in every round.
// Don't set to negative values; behavior will be "undefined".
protected static final int MaxUsageReportSuppressRounds = 5;

// Convenient shorthand, for MaxUsageReportSuppressRounds converted to a time interval in milliseconds.
protected static long maxIntervalForSuppressingReportsMSecs;

// The percentage difference that is considered "within limits" to suppress usage reporting.
// Setting this to 0 will also make us report in every round.
// Don't set it to negative values; behavior will be "undefined".
protected static final float UsageReportSuppressionTolerancePercentage = 5;

// Labels for the various counters used here.
private static final String[] resourceGroupLabel = {"ResourceGroup"};
private static final String[] resourceGroupMonitoringclassLabels = {"ResourceGroup", "MonitoringClass"};
Expand Down Expand Up @@ -953,15 +946,20 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
.register();

@VisibleForTesting
ConcurrentHashMap getTopicConsumeStats() {
Cache<String, BytesAndMessagesCount> getTopicConsumeStats() {
return this.topicConsumeStats;
}

@VisibleForTesting
ConcurrentHashMap getTopicProduceStats() {
Cache<String, BytesAndMessagesCount> getTopicProduceStats() {
return this.topicProduceStats;
}

@VisibleForTesting
Cache<String, BytesAndMessagesCount> getReplicationDispatchStats() {
return this.replicationDispatchStats;
}

@VisibleForTesting
ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() {
return this.aggregateLocalUsagePeriodicTask;
Expand Down
Loading
Loading