diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/CounterMetric.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/CounterMetric.java index ce278b559e..f33d1e84c4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/CounterMetric.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/CounterMetric.java @@ -14,6 +14,7 @@ import com.automq.stream.s3.metrics.MetricsConfig; import com.automq.stream.s3.metrics.MetricsLevel; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import io.opentelemetry.api.common.Attributes; @@ -21,6 +22,7 @@ public class CounterMetric extends ConfigurableMetric { private final Supplier longCounterSupplier; + private final AtomicLong total = new AtomicLong(0); public CounterMetric(MetricsConfig metricsConfig, Supplier longCounterSupplier) { super(metricsConfig, Attributes.empty()); @@ -33,10 +35,15 @@ public CounterMetric(MetricsConfig metricsConfig, Attributes extraAttributes, Su } public boolean add(MetricsLevel metricsLevel, long value) { + total.addAndGet(value); if (metricsLevel.isWithin(this.metricsLevel)) { longCounterSupplier.get().add(value, attributes); return true; } return false; } + + public long getValue() { + return total.get(); + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java index 1edf1abf74..483be75e40 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java @@ -146,7 +146,7 @@ private void forceConsume(long size) { public CompletableFuture consume(ThrottleStrategy throttleStrategy, long size) { CompletableFuture cf = new CompletableFuture<>(); - cf.whenComplete((v, e) -> logMetrics(size, throttleStrategy)); + cf.whenComplete((v, e) -> NetworkStats.getInstance().networkUsageTotalStats(type, throttleStrategy).add(MetricsLevel.INFO, size)); if (Objects.requireNonNull(throttleStrategy) == ThrottleStrategy.BYPASS) { forceConsume(size); cf.complete(null); @@ -171,10 +171,6 @@ private void reduceToken(long size) { this.availableTokens = Math.max(-maxTokens, availableTokens - size); } - private void logMetrics(long size, ThrottleStrategy strategy) { - NetworkStats.getInstance().networkUsageTotalStats(type, strategy).add(MetricsLevel.INFO, size); - } - public enum Type { INBOUND("Inbound"), OUTBOUND("Outbound");