Skip to content

Commit

Permalink
[fix][broker] usedLocallySinceLastReport should always be reset (apac…
Browse files Browse the repository at this point in the history
…he#22672)

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece authored May 9, 2024
1 parent 88feb87 commit 8f015d8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -458,14 +458,13 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas

bytesUsed = monEntity.usedLocallySinceLastReport.bytes;
messagesUsed = monEntity.usedLocallySinceLastReport.messages;

monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
if (sendReport) {
p.setBytesPerPeriod(bytesUsed);
p.setMessagesPerPeriod(messagesUsed);
monEntity.lastReportedValues.bytes = bytesUsed;
monEntity.lastReportedValues.messages = messagesUsed;
monEntity.numSuppressedUsageReports = 0;
monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
monEntity.totalUsedLocally.bytes += bytesUsed;
monEntity.totalUsedLocally.messages += messagesUsed;
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,34 +72,50 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
rgConfig.setPublishRateInMsgs(2000);
service.resourceGroupCreate(rgName, rgConfig);

org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName);
BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount();
bytesAndMessagesCount.bytes = 20;
bytesAndMessagesCount.messages = 10;
resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount);

org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName);
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount);
}

// Case1: Suppress report ResourceUsage.
needReport.set(false);
ResourceUsage resourceUsage = new ResourceUsage();
resourceGroup.rgFillResourceUsage(resourceUsage);
assertFalse(resourceUsage.hasDispatch());
assertFalse(resourceUsage.hasPublish());
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
PerMonitoringClassFields monitoredEntity =
resourceGroup.getMonitoredEntity(value);
assertEquals(monitoredEntity.usedLocallySinceLastReport.messages, 0);
assertEquals(monitoredEntity.usedLocallySinceLastReport.bytes, 0);
assertEquals(monitoredEntity.totalUsedLocally.messages, 0);
assertEquals(monitoredEntity.totalUsedLocally.bytes, 0);
assertEquals(monitoredEntity.lastReportedValues.messages, 0);
assertEquals(monitoredEntity.lastReportedValues.bytes, 0);
}

PerMonitoringClassFields publishMonitoredEntity =
resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes);
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0);
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0);
assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0);
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0);

// Case2: Report ResourceUsage.
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount);
}
needReport.set(true);
resourceUsage = new ResourceUsage();
resourceGroup.rgFillResourceUsage(resourceUsage);
assertTrue(resourceUsage.hasDispatch());
assertTrue(resourceUsage.hasPublish());
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0);
assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0);
assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes);
assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages);
assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes);
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
PerMonitoringClassFields monitoredEntity =
resourceGroup.getMonitoredEntity(value);
assertEquals(monitoredEntity.usedLocallySinceLastReport.messages, 0);
assertEquals(monitoredEntity.usedLocallySinceLastReport.bytes, 0);
assertEquals(monitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages);
assertEquals(monitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes);
assertEquals(monitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages);
assertEquals(monitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes);
}
}
}

0 comments on commit 8f015d8

Please sign in to comment.