diff --git a/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java b/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java
index 11ebf5227..ae51b0410 100644
--- a/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java
+++ b/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java
@@ -28,7 +28,6 @@
import com.automq.rocketmq.store.MessageStoreImpl;
import com.automq.rocketmq.store.metrics.StoreMetricsManager;
import com.automq.rocketmq.store.metrics.StreamMetricsManager;
-import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.google.common.base.Splitter;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.common.Attributes;
@@ -104,7 +103,6 @@ public MetricsExporter(BrokerConfig brokerConfig, MessageStoreImpl messageStore,
this.streamMetricsManager = new StreamMetricsManager();
this.topicMetricsManager = new TopicMetricsManager(metadataStore, s3MetadataService);
init(resource, tracerProvider);
- S3StreamMetricsRegistry.setMetricsGroup(this.streamMetricsManager);
}
public static AttributesBuilder newAttributesBuilder() {
diff --git a/pom.xml b/pom.xml
index f332c83d0..e8872df75 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
32.1.3-jre
2.0.9
2.2
- 0.6.9-SNAPSHOT
+ 0.6.10-SNAPSHOT
23.5.26
diff --git a/s3stream/pom.xml b/s3stream/pom.xml
index 086fb5aad..69c5dcf86 100644
--- a/s3stream/pom.xml
+++ b/s3stream/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.automq.elasticstream
s3stream
- 0.6.9-SNAPSHOT
+ 0.6.10-SNAPSHOT
5.5.0
5.10.0
@@ -34,6 +34,7 @@
17
17
UTF-8
+ 1.32.0-alpha
@@ -112,6 +113,11 @@
jackson-databind
2.16.0
+
+ io.opentelemetry.instrumentation
+ opentelemetry-runtime-telemetry-java17
+ ${alapha.opentelemetry.version}
+
diff --git a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java
index 73221fb29..647126057 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java
@@ -17,7 +17,7 @@
package com.automq.stream.s3;
-import com.automq.stream.s3.metrics.stats.ByteBufMetricsStats;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
@@ -45,11 +45,11 @@ public static ByteBuf byteBuffer(int initCapacity) {
public static ByteBuf byteBuffer(int initCapacity, String name) {
try {
if (name != null) {
- ByteBufMetricsStats.getHistogram(name).update(initCapacity);
+ S3StreamMetricsManager.recordAllocateByteBufSize(initCapacity, name);
}
return ALLOC.directBuffer(initCapacity);
} catch (OutOfMemoryError e) {
- for (;;) {
+ for (; ; ) {
int freedBytes = 0;
for (OOMHandler handler : OOM_HANDLERS) {
freedBytes += handler.handle(initCapacity);
@@ -77,6 +77,7 @@ public static void registerOOMHandlers(OOMHandler handler) {
public interface OOMHandler {
/**
* Try handle OOM exception.
+ *
* @param memoryRequired the memory required
* @return freed memory.
*/
diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
index 84a18949c..81375c9e7 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
@@ -17,13 +17,13 @@
package com.automq.stream.s3;
+import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectStreamRange;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.operator.Writer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
-import com.automq.stream.s3.metadata.ObjectUtils;
import java.util.ArrayList;
import java.util.Collections;
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
index ba0a70a1f..c79153093 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
@@ -24,9 +24,9 @@
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.metadata.StreamMetadata;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
-import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
@@ -259,7 +259,7 @@ public CompletableFuture append(StreamRecordBatch streamRecord) {
append0(writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
- OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE);
});
return cf;
}
@@ -279,7 +279,7 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
if (!fromBackoff) {
backoffRecords.offer(request);
}
- OperationMetricsStats.getCounter(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).inc();
+ S3StreamMetricsManager.recordOperationNum(1, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
@@ -344,7 +344,7 @@ public CompletableFuture read(long streamId, long startOffset, lo
TimerUtil timerUtil = new TimerUtil();
CompletableFuture cf = new CompletableFuture<>();
FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, readOptions), cf);
- cf.whenComplete((nil, ex) -> OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
+ cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE));
return cf;
}
@@ -363,6 +363,7 @@ private CompletableFuture read0(long streamId, long startOffset,
endOffset = logCacheRecords.get(0).getBaseOffset();
}
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES);
+ long finalEndOffset = endOffset;
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(readDataBlock -> {
List rst = new ArrayList<>(readDataBlock.getRecords());
int remainingBytesSize = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum();
@@ -383,7 +384,7 @@ private CompletableFuture read0(long streamId, long startOffset,
timeout.cancel();
if (ex != null) {
LOGGER.error("read from block cache failed, stream={}, {}-{}, maxBytes: {}",
- streamId, startOffset, maxBytes, ex);
+ streamId, startOffset, finalEndOffset, maxBytes, ex);
logCacheRecords.forEach(StreamRecordBatch::release);
}
});
@@ -396,7 +397,7 @@ private void continuousCheck(List records) {
expectStartOffset = record.getLastOffset();
} else {
throw new IllegalArgumentException(String.format("Continuous check failed, expect offset: %d," +
- " actual: %d, records: %s", expectStartOffset, record.getBaseOffset() ,records));
+ " actual: %d, records: %s", expectStartOffset, record.getBaseOffset(), records));
}
}
}
@@ -444,7 +445,7 @@ private void handleAppendCallback(WalWriteRequest request) {
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
- OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_APPEND_CALLBACK).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_APPEND_CALLBACK);
}
private Lock getStreamCallbackLock(long streamId) {
@@ -488,7 +489,7 @@ CompletableFuture uploadDeltaWAL(DeltaWALUploadTaskContext context) {
inflightWALUploadTasks.add(cf);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL"));
cf.whenComplete((nil, ex) -> {
- OperationMetricsStats.getHistogram(S3Operation.UPLOAD_STORAGE_WAL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_STORAGE_WAL);
inflightWALUploadTasks.remove(cf);
if (ex != null) {
LOGGER.error("upload delta WAL fail", ex);
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
index f2e49c2db..619029294 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
@@ -29,9 +29,9 @@
import com.automq.stream.api.exceptions.FastReadFailFastException;
import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.cache.CacheAccessType;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
-import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.streams.StreamManager;
@@ -141,7 +141,7 @@ public long nextOffset() {
public CompletableFuture append(RecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
writeLock.lock();
- OperationMetricsStats.getHistogram(S3Operation.APPEND_STREAM_WRITE_LOCK).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK);
try {
CompletableFuture cf = exec(() -> {
if (networkInboundLimiter != null) {
@@ -151,7 +151,7 @@ public CompletableFuture append(RecordBatch recordBatch) {
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
- OperationMetricsStats.getHistogram(S3Operation.APPEND_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM);
pendingAppends.remove(cf);
});
return cf;
@@ -188,12 +188,12 @@ private CompletableFuture append0(RecordBatch recordBatch) {
public CompletableFuture fetch(long startOffset, long endOffset, int maxBytes, ReadOptions readOptions) {
TimerUtil timerUtil = new TimerUtil();
readLock.lock();
- OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM_READ_LOCK).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK);
try {
CompletableFuture cf = exec(() -> fetch0(startOffset, endOffset, maxBytes, readOptions), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
- OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM);
if (ex != null) {
Throwable cause = FutureUtil.cause(ex);
if (!(cause instanceof FastReadFailFastException)) {
@@ -255,7 +255,7 @@ public CompletableFuture trim(long newStartOffset) {
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
- OperationMetricsStats.getHistogram(S3Operation.TRIM_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.TRIM_STREAM);
});
return cf;
}, LOGGER, "trim");
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
index 565ae9060..9aed54d36 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
@@ -21,9 +21,9 @@
import com.automq.stream.api.OpenStreamOptions;
import com.automq.stream.api.Stream;
import com.automq.stream.api.StreamClient;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
-import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
@@ -84,7 +84,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
public CompletableFuture createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
- OperationMetricsStats.getHistogram(S3Operation.CREATE_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_STREAM);
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
}
@@ -139,7 +139,7 @@ private CompletableFuture openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
- OperationMetricsStats.getHistogram(S3Operation.OPEN_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.OPEN_STREAM);
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.compactedStreamObjectMaxSizeInBytes(config.streamObjectCompactionMaxSizeBytes())
.eligibleStreamObjectLivingTimeInMs(config.streamObjectCompactionLivingTimeMinutes() * 60L * 1000)
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
index 7f1e5dede..f4f72ed63 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
@@ -18,9 +18,9 @@
package com.automq.stream.s3.cache;
import com.automq.stream.s3.Config;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
-import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
@@ -86,7 +86,7 @@ public CompletableFuture read(long streamId, long startOffset, lo
FutureUtil.exec(() -> {
read0(streamId, startOffset, endOffset, maxBytes, agent, uuid).whenComplete((ret, ex) -> {
if (ex != null) {
- LOGGER.error("read {} [{}, {}) from block cache fail", streamId, startOffset, endOffset, ex);
+ LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail", streamId, startOffset, endOffset, maxBytes, ex);
readCf.completeExceptionally(ex);
this.inflightReadThrottle.release(uuid);
return;
@@ -95,16 +95,13 @@ public CompletableFuture read(long streamId, long startOffset, lo
this.readAheadManager.updateReadResult(streamId, startOffset,
ret.getRecords().get(ret.getRecords().size() - 1).getLastOffset(), totalReturnedSize);
- if (ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT) {
- OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE).inc();
- } else {
- OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE_MISS).inc();
- }
+ long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
+ boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT;
+ S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE_HIT, isCacheHit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {} ",
ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, totalReturnedSize);
}
- OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_BLOCK_CACHE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
readCf.complete(ret);
this.inflightReadThrottle.release(uuid);
});
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java b/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java
index 624ff7fa7..2bc73fa39 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java
@@ -17,7 +17,7 @@
package com.automq.stream.s3.cache;
-import com.automq.stream.s3.metrics.stats.BlockCacheMetricsStats;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import com.automq.stream.utils.Utils;
@@ -56,7 +56,7 @@ public InflightReadThrottle(int maxInflightReadBytes) {
this.maxInflightReadBytes = maxInflightReadBytes;
this.remainingInflightReadBytes = maxInflightReadBytes;
executorService.execute(this);
- BlockCacheMetricsStats.registerAvailableInflightReadSize(this::getRemainingInflightReadBytes);
+ S3StreamMetricsManager.registerInflightReadSizeLimiterGauge(this::getRemainingInflightReadBytes);
}
public void shutdown() {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
index 371db9bb6..f139fdca5 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
@@ -17,9 +17,9 @@
package com.automq.stream.s3.cache;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
-import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.utils.biniarysearch.StreamRecordBatchList;
@@ -85,7 +85,7 @@ public boolean put(StreamRecordBatch recordBatch) {
tryRealFree();
size.addAndGet(recordBatch.size());
boolean full = activeBlock.put(recordBatch);
- OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_LOG_CACHE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_LOG_CACHE);
return full;
}
@@ -124,12 +124,9 @@ public List get(long streamId, long startOffset, long endOffs
readLock.unlock();
}
- if (!records.isEmpty() && records.get(0).getBaseOffset() <= startOffset) {
- OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE).inc();
- } else {
- OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE_MISS).inc();
- }
- OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_LOG_CACHE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
+ boolean isCacheHit = !records.isEmpty() && records.get(0).getBaseOffset() <= startOffset;
+ S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE_HIT, isCacheHit);
return records;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
index ff2645e46..d6b10448e 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
@@ -17,8 +17,8 @@
package com.automq.stream.s3.cache;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
-import com.automq.stream.s3.metrics.stats.BlockCacheMetricsStats;
import com.automq.stream.utils.LogContext;
import com.google.common.base.Objects;
import org.apache.commons.lang3.tuple.Pair;
@@ -99,7 +99,7 @@ public void updateReadAheadResult(long readAheadEndOffset, int readAheadSize) {
lock.lock();
this.readAheadEndOffset = readAheadEndOffset;
this.lastReadAheadSize = readAheadSize;
- BlockCacheMetricsStats.getOrCreateReadAheadSizeHist().update(readAheadSize);
+ S3StreamMetricsManager.recordReadAheadSize(readAheadSize);
if (logger.isDebugEnabled()) {
logger.debug("update read ahead offset {}, size: {}, lastReadOffset: {}", readAheadEndOffset, readAheadSize, lastReadOffset);
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java
index 9980b5c4a..90407f245 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java
@@ -19,12 +19,13 @@
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.StreamDataBlock;
+import com.automq.stream.s3.metadata.S3ObjectMetadata;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.S3Operator;
import io.github.bucket4j.Bucket;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
-import com.automq.stream.s3.metadata.S3ObjectMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,6 +187,11 @@ private void readContinuousBlocks0(List streamDataBlocks) {
}
private CompletableFuture rangeRead(long start, long end) {
+ return rangeRead0(start, end).whenComplete((ret, ex) ->
+ S3StreamMetricsManager.recordCompactionReadSizeIn(ret.readableBytes()));
+ }
+
+ private CompletableFuture rangeRead0(long start, long end) {
if (throttleBucket == null) {
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
// convert heap buffer to direct buffer
diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java
index 8f8f80566..72a010e13 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java
@@ -19,12 +19,13 @@
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.StreamDataBlock;
+import com.automq.stream.s3.metadata.ObjectUtils;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.operator.Writer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
-import com.automq.stream.s3.metadata.ObjectUtils;
import java.util.LinkedList;
import java.util.List;
@@ -61,7 +62,9 @@ public long getObjectId() {
}
public void write(StreamDataBlock dataBlock) {
- waitingUploadBlockCfs.put(dataBlock, new CompletableFuture<>());
+ CompletableFuture cf = new CompletableFuture<>();
+ cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordCompactionWriteSize(dataBlock.getBlockSize()));
+ waitingUploadBlockCfs.put(dataBlock, cf);
waitingUploadBlocks.add(dataBlock);
long waitingUploadSize = waitingUploadBlocks.stream().mapToLong(StreamDataBlock::getBlockSize).sum();
if (waitingUploadSize >= partSizeThreshold) {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java
index 787fc8812..48082bb2f 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java
@@ -22,9 +22,9 @@
import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metadata.StreamOffsetRange;
-import com.automq.stream.s3.objects.CompactStreamObjectRequest;
import com.automq.stream.s3.objects.CommitStreamSetObjectRequest;
import com.automq.stream.s3.objects.CommitStreamSetObjectResponse;
+import com.automq.stream.s3.objects.CompactStreamObjectRequest;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.objects.ObjectStreamRange;
import com.automq.stream.s3.objects.StreamObject;
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/Histogram.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/Histogram.java
deleted file mode 100644
index 738e1dacb..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/Histogram.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.metrics;
-
-public interface Histogram {
- void update(long value);
- long count();
- double mean();
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopHistogram.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopHistogram.java
deleted file mode 100644
index d9a886f30..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopHistogram.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.metrics;
-
-public class NoopHistogram implements Histogram {
- @Override
- public void update(long value) {
-
- }
-
- @Override
- public long count() {
- return 0;
- }
-
- @Override
- public double mean() {
- return 0;
- }
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopCounter.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopLongCounter.java
similarity index 72%
rename from s3stream/src/main/java/com/automq/stream/s3/metrics/NoopCounter.java
rename to s3stream/src/main/java/com/automq/stream/s3/metrics/NoopLongCounter.java
index beac122e0..fe04076e9 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopCounter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopLongCounter.java
@@ -17,19 +17,23 @@
package com.automq.stream.s3.metrics;
-public class NoopCounter implements Counter {
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.context.Context;
+
+public class NoopLongCounter implements LongCounter {
@Override
- public void inc() {
+ public void add(long l) {
}
@Override
- public void inc(long n) {
+ public void add(long l, Attributes attributes) {
}
@Override
- public long count() {
- return 0;
+ public void add(long l, Attributes attributes, Context context) {
+
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopLongHistogram.java
similarity index 71%
rename from s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java
rename to s3stream/src/main/java/com/automq/stream/s3/metrics/NoopLongHistogram.java
index 3a250b302..a8cb93aca 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopS3StreamMetricsGroup.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopLongHistogram.java
@@ -17,23 +17,23 @@
package com.automq.stream.s3.metrics;
-import java.util.Map;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.context.Context;
-public class NoopS3StreamMetricsGroup implements S3StreamMetricsGroup {
+public class NoopLongHistogram implements LongHistogram {
@Override
- public Counter newCounter(String name, Map tags) {
- return null;
- }
+ public void record(long l) {
- @Override
- public Histogram newHistogram(String name, Map tags) {
- return null;
}
@Override
- public void newGauge(String name, Map tags, Gauge gauge) {
+ public void record(long l, Attributes attributes) {
}
+ @Override
+ public void record(long l, Attributes attributes, Context context) {
+ }
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableLongGauge.java
similarity index 86%
rename from s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java
rename to s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableLongGauge.java
index fc09f9d14..c228462fe 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/Counter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableLongGauge.java
@@ -17,9 +17,7 @@
package com.automq.stream.s3.metrics;
-public interface Counter {
- String SUFFIX = "_total";
- void inc();
- void inc(long n);
- long count();
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+
+public class NoopObservableLongGauge implements ObservableLongGauge {
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java
new file mode 100644
index 000000000..08d7b8c19
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.metrics;
+
+import io.opentelemetry.api.common.AttributeKey;
+
+public class S3StreamMetricsConstant {
+ public static final String UPLOAD_SIZE_METRIC_NAME = "upload_size_total";
+ public static final String DOWNLOAD_SIZE_METRIC_NAME = "download_size_total";
+ public static final String OPERATION_COUNT_METRIC_NAME = "operation_count_total";
+ public static final String OPERATION_LATENCY_METRIC_NAME = "operation_latency";
+ public static final String OBJECT_COUNT_METRIC_NAME = "object_count_total";
+ public static final String OBJECT_STAGE_COST_METRIC_NAME = "object_stage_cost";
+ public static final String OBJECT_UPLOAD_SIZE_METRIC_NAME = "object_upload_size";
+ public static final String OBJECT_DOWNLOAD_SIZE_METRIC_NAME = "object_download_size";
+ public static final String NETWORK_INBOUND_USAGE_METRIC_NAME = "network_inbound_usage_total";
+ public static final String NETWORK_OUTBOUND_USAGE_METRIC_NAME = "network_outbound_usage_total";
+ public static final String NETWORK_INBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME = "network_inbound_available_bandwidth";
+ public static final String NETWORK_OUTBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME = "network_outbound_available_bandwidth";
+ public static final String NETWORK_INBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME = "network_inbound_limiter_queue_size";
+ public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME = "network_outbound_limiter_queue_size";
+ public static final String ALLOCATE_BYTE_BUF_SIZE_METRIC_NAME = "allocate_byte_buf_size";
+ public static final String READ_AHEAD_SIZE_METRIC_NAME = "read_ahead_size";
+ public static final String AVAILABLE_INFLIGHT_READ_AHEAD_SIZE_METRIC_NAME = "available_inflight_read_ahead_size";
+ public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size_total";
+ public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size_total";
+ public static final AttributeKey LABEL_OPERATION_TYPE = AttributeKey.stringKey("operation_type");
+ public static final AttributeKey LABEL_OPERATION_NAME = AttributeKey.stringKey("operation_name");
+ public static final AttributeKey LABEL_APPEND_WAL_STAGE = AttributeKey.stringKey("stage");
+ public static final AttributeKey LABEL_CACHE_STATUS = AttributeKey.stringKey("status");
+ public static final AttributeKey LABEL_OBJECT_STAGE = AttributeKey.stringKey("stage");
+ public static final AttributeKey LABEL_ALLOCATE_BYTE_BUF_SOURCE = AttributeKey.stringKey("source");
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java
deleted file mode 100644
index cedf24747..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsGroup.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.metrics;
-
-import java.util.Map;
-
-public interface S3StreamMetricsGroup {
- Counter newCounter(String name, Map tags);
-
- Histogram newHistogram(String name, Map tags);
-
- void newGauge(String name, Map tags, Gauge gauge);
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
new file mode 100644
index 000000000..bc28b29e2
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.metrics;
+
+import com.automq.stream.s3.metrics.operations.S3MetricsType;
+import com.automq.stream.s3.metrics.operations.S3ObjectStage;
+import com.automq.stream.s3.metrics.operations.S3Operation;
+import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+
+import java.util.function.Supplier;
+
+public class S3StreamMetricsManager {
+ private static LongCounter s3DownloadSizeInTotal = new NoopLongCounter();
+ private static LongCounter s3UploadSizeInTotal = new NoopLongCounter();
+ private static LongCounter operationNumInTotal = new NoopLongCounter();
+ private static LongHistogram operationLatency = new NoopLongHistogram();
+ private static LongCounter objectNumInTotal = new NoopLongCounter();
+ private static LongHistogram objectStageCost = new NoopLongHistogram();
+ private static LongHistogram objectUploadSize = new NoopLongHistogram();
+ private static LongHistogram objectDownloadSize = new NoopLongHistogram();
+ private static LongCounter networkInboundUsageInTotal = new NoopLongCounter();
+ private static LongCounter networkOutboundUsageInTotal = new NoopLongCounter();
+ private static ObservableLongGauge networkInboundAvailableBandwidth = new NoopObservableLongGauge();
+ private static ObservableLongGauge networkOutboundAvailableBandwidth = new NoopObservableLongGauge();
+ private static ObservableLongGauge networkInboundLimiterQueueSize = new NoopObservableLongGauge();
+ private static ObservableLongGauge networkOutboundLimiterQueueSize = new NoopObservableLongGauge();
+ private static LongHistogram allocateByteBufSize = new NoopLongHistogram();
+ private static LongHistogram readAheadSize = new NoopLongHistogram();
+ private static ObservableLongGauge availableInflightReadAheadSize = new NoopObservableLongGauge();
+ private static LongCounter compactionReadSizeInTotal = new NoopLongCounter();
+ private static LongCounter compactionWriteSizeInTotal = new NoopLongCounter();
+ private static Gauge networkInboundAvailableBandwidthValue = new NoopGauge();
+ private static Gauge networkOutboundAvailableBandwidthValue = new NoopGauge();
+ private static Gauge networkInboundLimiterQueueSizeValue = new NoopGauge();
+ private static Gauge networkOutboundLimiterQueueSizeValue = new NoopGauge();
+ private static Gauge availableInflightReadAheadSizeValue = new NoopGauge();
+ private static Supplier attributesBuilderSupplier = null;
+
+ public static void initAttributesBuilder(Supplier attributesBuilderSupplier) {
+ S3StreamMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
+ }
+
+ public static void initMetrics(Meter meter) {
+ initMetrics(meter, "");
+ }
+
+ public static void initMetrics(Meter meter, String prefix) {
+ s3DownloadSizeInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.DOWNLOAD_SIZE_METRIC_NAME)
+ .setDescription("S3 download size")
+ .setUnit("bytes")
+ .build();
+ s3UploadSizeInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.UPLOAD_SIZE_METRIC_NAME)
+ .setDescription("S3 upload size")
+ .setUnit("bytes")
+ .build();
+ operationNumInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.OPERATION_COUNT_METRIC_NAME)
+ .setDescription("Operations count")
+ .build();
+ operationLatency = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OPERATION_LATENCY_METRIC_NAME)
+ .setDescription("Operations latency")
+ .setUnit("nanoseconds")
+ .ofLongs()
+ .build();
+ objectNumInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.OBJECT_COUNT_METRIC_NAME)
+ .setDescription("Objects count")
+ .build();
+ objectStageCost = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OBJECT_STAGE_COST_METRIC_NAME)
+ .setDescription("Objects stage cost")
+ .setUnit("nanoseconds")
+ .ofLongs()
+ .build();
+ objectUploadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OBJECT_UPLOAD_SIZE_METRIC_NAME)
+ .setDescription("Objects upload size")
+ .setUnit("bytes")
+ .ofLongs()
+ .build();
+ objectDownloadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OBJECT_DOWNLOAD_SIZE_METRIC_NAME)
+ .setDescription("Objects download size")
+ .setUnit("bytes")
+ .ofLongs()
+ .build();
+ networkInboundUsageInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_USAGE_METRIC_NAME)
+ .setDescription("Network inbound usage")
+ .setUnit("bytes")
+ .build();
+ networkOutboundUsageInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_USAGE_METRIC_NAME)
+ .setDescription("Network outbound usage")
+ .setUnit("bytes")
+ .build();
+ networkInboundAvailableBandwidth = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME)
+ .setDescription("Network inbound available bandwidth")
+ .setUnit("bytes")
+ .ofLongs()
+ .buildWithCallback(result -> result.record(networkInboundAvailableBandwidthValue.value(), newAttributesBuilder().build()));
+ networkOutboundAvailableBandwidth = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME)
+ .setDescription("Network outbound available bandwidth")
+ .setUnit("bytes")
+ .ofLongs()
+ .buildWithCallback(result -> result.record(networkOutboundAvailableBandwidthValue.value(), newAttributesBuilder().build()));
+ networkInboundLimiterQueueSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME)
+ .setDescription("Network inbound limiter queue size")
+ .ofLongs()
+ .buildWithCallback(result -> result.record(networkInboundLimiterQueueSizeValue.value(), newAttributesBuilder().build()));
+ networkOutboundLimiterQueueSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME)
+ .setDescription("Network outbound limiter queue size")
+ .ofLongs()
+ .buildWithCallback(result -> result.record(networkOutboundLimiterQueueSizeValue.value(), newAttributesBuilder().build()));
+ allocateByteBufSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.ALLOCATE_BYTE_BUF_SIZE_METRIC_NAME)
+ .setDescription("Allocate byte buf size")
+ .setUnit("bytes")
+ .ofLongs()
+ .build();
+ readAheadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.READ_AHEAD_SIZE_METRIC_NAME)
+ .setDescription("Read ahead size")
+ .setUnit("bytes")
+ .ofLongs()
+ .build();
+ availableInflightReadAheadSize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.AVAILABLE_INFLIGHT_READ_AHEAD_SIZE_METRIC_NAME)
+ .setDescription("Available inflight read ahead size")
+ .setUnit("bytes")
+ .ofLongs()
+ .buildWithCallback(result -> result.record(availableInflightReadAheadSizeValue.value(), newAttributesBuilder().build()));
+ compactionReadSizeInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.COMPACTION_READ_SIZE_METRIC_NAME)
+ .setDescription("Compaction read size")
+ .setUnit("bytes")
+ .build();
+ compactionWriteSizeInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.COMPACTION_WRITE_SIZE_METRIC_NAME)
+ .setDescription("Compaction write size")
+ .setUnit("bytes")
+ .build();
+ }
+
+ private static AttributesBuilder newAttributesBuilder() {
+ if (attributesBuilderSupplier != null) {
+ return attributesBuilderSupplier.get();
+ }
+ return Attributes.builder();
+ }
+
+ public static void registerNetworkLimiterGauge(AsyncNetworkBandwidthLimiter.Type type, Gauge networkAvailableBandwidthValue, Gauge networkLimiterQueueSizeValue) {
+ switch (type) {
+ case INBOUND -> {
+ S3StreamMetricsManager.networkInboundAvailableBandwidthValue = networkAvailableBandwidthValue;
+ S3StreamMetricsManager.networkInboundLimiterQueueSizeValue = networkLimiterQueueSizeValue;
+ }
+ case OUTBOUND -> {
+ S3StreamMetricsManager.networkOutboundAvailableBandwidthValue = networkAvailableBandwidthValue;
+ S3StreamMetricsManager.networkOutboundLimiterQueueSizeValue = networkLimiterQueueSizeValue;
+ }
+ }
+ }
+
+ public static void registerInflightReadSizeLimiterGauge(Gauge availableInflightReadAheadSizeValue) {
+ S3StreamMetricsManager.availableInflightReadAheadSizeValue = availableInflightReadAheadSizeValue;
+ }
+
+ public static void recordS3UploadSize(long value) {
+ s3UploadSizeInTotal.add(value, newAttributesBuilder().build());
+ }
+
+ public static void recordS3DownloadSize(long value) {
+ s3DownloadSizeInTotal.add(value, newAttributesBuilder().build());
+ }
+
+ public static void recordOperationNum(long value, S3Operation operation) {
+ Attributes attributes = newAttributesBuilder()
+ .put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, operation.getType().getName())
+ .put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName())
+ .build();
+ operationNumInTotal.add(value, attributes);
+ }
+
+ public static void recordOperationLatency(long value, S3Operation operation) {
+ Attributes attributes = newAttributesBuilder()
+ .put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, operation.getType().getName())
+ .put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName())
+ .build();
+ operationLatency.record(value, attributes);
+ }
+
+ public static void recordAppendWALLatency(long value, String stage) {
+ Attributes attributes = newAttributesBuilder()
+ .put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, S3MetricsType.S3Storage.getName())
+ .put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, S3Operation.APPEND_STORAGE_WAL.getName())
+ .put(S3StreamMetricsConstant.LABEL_APPEND_WAL_STAGE, stage)
+ .build();
+ operationLatency.record(value, attributes);
+ }
+
+ public static void recordReadCacheLatency(long value, S3Operation operation, boolean isCacheHit) {
+ Attributes attributes = newAttributesBuilder()
+ .put(S3StreamMetricsConstant.LABEL_OPERATION_TYPE, operation.getType().getName())
+ .put(S3StreamMetricsConstant.LABEL_OPERATION_NAME, operation.getName())
+ .put(S3StreamMetricsConstant.LABEL_CACHE_STATUS, isCacheHit ? "hit" : "miss")
+ .build();
+ operationLatency.record(value, attributes);
+ }
+
+ public static void recordObjectNum(long value) {
+ objectNumInTotal.add(value, newAttributesBuilder().build());
+ }
+
+ public static void recordObjectStageCost(long value, S3ObjectStage stage) {
+ Attributes attributes = newAttributesBuilder()
+ .put(S3StreamMetricsConstant.LABEL_OBJECT_STAGE, stage.getName())
+ .build();
+ objectStageCost.record(value, attributes);
+ }
+
+ public static void recordObjectUploadSize(long value) {
+ objectUploadSize.record(value, newAttributesBuilder().build());
+ }
+
+ public static void recordObjectDownloadSize(long value) {
+ objectDownloadSize.record(value, newAttributesBuilder().build());
+ }
+
+ public static void recordNetworkInboundUsage(long value) {
+ networkInboundUsageInTotal.add(value, newAttributesBuilder().build());
+ }
+
+ public static void recordNetworkOutboundUsage(long value) {
+ networkOutboundUsageInTotal.add(value, newAttributesBuilder().build());
+ }
+
+ public static void recordAllocateByteBufSize(long value, String source) {
+ Attributes attributes = newAttributesBuilder()
+ .put(S3StreamMetricsConstant.LABEL_ALLOCATE_BYTE_BUF_SOURCE, source)
+ .build();
+ allocateByteBufSize.record(value, attributes);
+ }
+
+ public static void recordReadAheadSize(long value) {
+ readAheadSize.record(value, newAttributesBuilder().build());
+ }
+
+ public static void recordCompactionReadSizeIn(long value) {
+ compactionReadSizeInTotal.add(value, newAttributesBuilder().build());
+ }
+
+ public static void recordCompactionWriteSize(long value) {
+ compactionWriteSizeInTotal.add(value, newAttributesBuilder().build());
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsRegistry.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsRegistry.java
deleted file mode 100644
index a5f526681..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsRegistry.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.metrics;
-
-public class S3StreamMetricsRegistry {
- private static S3StreamMetricsGroup metricsGroup = new NoopS3StreamMetricsGroup();
-
- private S3StreamMetricsRegistry() {
-
- }
-
- public static void setMetricsGroup(S3StreamMetricsGroup metricsGroup) {
- S3StreamMetricsRegistry.metricsGroup = metricsGroup;
- }
-
- public static S3StreamMetricsGroup getMetricsGroup() {
- return metricsGroup;
- }
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java
index 9975b18ec..bd4d82339 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java
@@ -32,21 +32,14 @@ public enum S3Operation {
/* S3 storage operations start */
APPEND_STORAGE(S3MetricsType.S3Storage, "append"),
APPEND_STORAGE_WAL(S3MetricsType.S3Storage, "append_wal"),
- APPEND_STORAGE_WAL_BEFORE(S3MetricsType.S3Storage, "append_wal_before"),
- APPEND_STORAGE_WAL_AWAIT(S3MetricsType.S3Storage, "append_wal_await"),
- APPEND_STORAGE_WAL_WRITE(S3MetricsType.S3Storage, "append_wal_write"),
- APPEND_STORAGE_WAL_AFTER(S3MetricsType.S3Storage, "append_wal_after"),
- APPEND_STORAGE_WAL_BLOCK_POLLED(S3MetricsType.S3Storage, "append_wal_block_polled"),
APPEND_STORAGE_APPEND_CALLBACK(S3MetricsType.S3Storage, "append_callback"),
APPEND_STORAGE_WAL_FULL(S3MetricsType.S3Storage, "append_wal_full"),
APPEND_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "append_log_cache"),
APPEND_STORAGE_LOG_CACHE_FULL(S3MetricsType.S3Storage, "append_log_cache_full"),
UPLOAD_STORAGE_WAL(S3MetricsType.S3Storage, "upload_wal"),
READ_STORAGE(S3MetricsType.S3Storage, "read"),
- READ_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "read_log_cache"),
- READ_STORAGE_LOG_CACHE_MISS(S3MetricsType.S3Storage, "read_log_cache_miss"),
- READ_STORAGE_BLOCK_CACHE(S3MetricsType.S3Storage, "read_block_cache"),
- READ_STORAGE_BLOCK_CACHE_MISS(S3MetricsType.S3Storage, "read_block_cache_miss"),
+ READ_STORAGE_LOG_CACHE_HIT(S3MetricsType.S3Storage, "read_log_cache"),
+ READ_STORAGE_BLOCK_CACHE_HIT(S3MetricsType.S3Storage, "read_block_cache"),
/* S3 storage operations end */
/* S3 request operations start */
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/BlockCacheMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/BlockCacheMetricsStats.java
deleted file mode 100644
index 50edd7346..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/BlockCacheMetricsStats.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.metrics.stats;
-
-import com.automq.stream.s3.metrics.Gauge;
-import com.automq.stream.s3.metrics.Histogram;
-import com.automq.stream.s3.metrics.NoopHistogram;
-import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
-
-import java.util.Collections;
-
-public class BlockCacheMetricsStats {
- private static Histogram readAheadSizeHistogram = null;
-
- public static Histogram getOrCreateReadAheadSizeHist() {
- if (readAheadSizeHistogram == null) {
- readAheadSizeHistogram = S3StreamMetricsRegistry.getMetricsGroup()
- .newHistogram("read_ahead_size", Collections.emptyMap());
- }
- return readAheadSizeHistogram == null ? new NoopHistogram() : readAheadSizeHistogram;
- }
-
- public static void registerAvailableInflightReadSize(Gauge gauge) {
- S3StreamMetricsRegistry.getMetricsGroup().newGauge("available_inflight_read_size", Collections.emptyMap(), gauge);
- }
-
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java
deleted file mode 100644
index 486c3e6e5..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufMetricsStats.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.metrics.stats;
-
-import com.automq.stream.s3.metrics.Histogram;
-import com.automq.stream.s3.metrics.NoopHistogram;
-import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class ByteBufMetricsStats {
- private static final Map SOURCE_TO_HISTOGRAM = new ConcurrentHashMap<>();
-
- public static Histogram getHistogram(String source) {
- Histogram hist = SOURCE_TO_HISTOGRAM.computeIfAbsent(source, k -> {
- Map tags = Map.of("source", k);
- return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_stream_byte_buf_size", tags);
- });
- return hist == null ? new NoopHistogram() : hist;
- }
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java
deleted file mode 100644
index 6829b259b..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkMetricsStats.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.metrics.stats;
-
-import com.automq.stream.s3.metrics.Counter;
-import com.automq.stream.s3.metrics.Gauge;
-import com.automq.stream.s3.metrics.NoopCounter;
-import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
-import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
-
-import java.util.Collections;
-
-public class NetworkMetricsStats {
-
- public static Counter networkInboundUsageCounter = null;
-
- public static Counter networkOutboundUsageCounter = null;
-
- public static Counter getOrCreateNetworkInboundUsageCounter() {
- if (networkInboundUsageCounter == null) {
- networkInboundUsageCounter = S3StreamMetricsRegistry.getMetricsGroup()
- .newCounter("network_inbound_usage" + Counter.SUFFIX, Collections.emptyMap());
- }
- return networkInboundUsageCounter == null ? new NoopCounter() : networkInboundUsageCounter;
- }
-
- public static Counter getOrCreateNetworkOutboundUsageCounter() {
- if (networkOutboundUsageCounter == null) {
- networkOutboundUsageCounter = S3StreamMetricsRegistry.getMetricsGroup()
- .newCounter("network_outbound_usage" + Counter.SUFFIX, Collections.emptyMap());
- }
- return networkOutboundUsageCounter == null ? new NoopCounter() : networkOutboundUsageCounter;
- }
-
- public static void registerNetworkInboundAvailableBandwidth(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) {
- String metricName = String.format("network_%s_available_bandwidth", type.getName().toLowerCase());
- S3StreamMetricsRegistry.getMetricsGroup().newGauge(metricName, Collections.emptyMap(), gauge);
- }
-
- public static void registerNetworkLimiterQueueSize(AsyncNetworkBandwidthLimiter.Type type, Gauge gauge) {
- String metricName = String.format("network_%s_limiter_queue_size", type.getName().toLowerCase());
- S3StreamMetricsRegistry.getMetricsGroup().newGauge(metricName, Collections.emptyMap(), gauge);
- }
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java
deleted file mode 100644
index 017f887a8..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/OperationMetricsStats.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.metrics.stats;
-
-import com.automq.stream.s3.metrics.Counter;
-import com.automq.stream.s3.metrics.Histogram;
-import com.automq.stream.s3.metrics.NoopCounter;
-import com.automq.stream.s3.metrics.NoopHistogram;
-import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
-import com.automq.stream.s3.metrics.operations.S3Operation;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class OperationMetricsStats {
- private static final Map OPERATION_COUNTER_MAP = new ConcurrentHashMap<>();
- private static final Map OPERATION_HIST_MAP = new ConcurrentHashMap<>();
-
- public static Counter getCounter(S3Operation s3Operation) {
- return getOrCreateCounterMetrics(s3Operation);
- }
-
- public static Histogram getHistogram(S3Operation s3Operation) {
- return getOrCreateHistMetrics(s3Operation);
- }
-
- private static Counter getOrCreateCounterMetrics(S3Operation s3Operation) {
- Counter counter = OPERATION_COUNTER_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
- .newCounter("operation_count" + Counter.SUFFIX, tags(s3Operation)));
- return counter == null ? new NoopCounter() : counter;
- }
-
- private static Histogram getOrCreateHistMetrics(S3Operation s3Operation) {
- Histogram hist = OPERATION_HIST_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
- .newHistogram("operation_time", tags(s3Operation)));
- return hist == null ? new NoopHistogram() : hist;
- }
-
- private static Map tags(S3Operation s3Operation) {
- return Map.of(
- "operation", s3Operation.getName(),
- "op_type", s3Operation.getType().getName());
- }
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java
deleted file mode 100644
index 547485c23..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectMetricsStats.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.metrics.stats;
-
-import com.automq.stream.s3.metrics.Counter;
-import com.automq.stream.s3.metrics.Histogram;
-import com.automq.stream.s3.metrics.NoopCounter;
-import com.automq.stream.s3.metrics.NoopHistogram;
-import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
-import com.automq.stream.s3.metrics.operations.S3ObjectStage;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class S3ObjectMetricsStats {
- private static final Map S3_OBJECT_TIME_MAP = new ConcurrentHashMap<>();
- public static Counter s3ObjectCounter = null;
- public static Histogram s3ObjectUploadSizeHist = null;
- public static Histogram s3ObjectDownloadSizeHist = null;
-
- public static Counter getOrCreateS3ObjectCounter() {
- if (s3ObjectCounter == null) {
- s3ObjectCounter = S3StreamMetricsRegistry.getMetricsGroup().newCounter("s3_object_count" + Counter.SUFFIX, Collections.emptyMap());
- }
- return s3ObjectCounter == null ? new NoopCounter() : s3ObjectCounter;
- }
-
- public static Histogram getOrCreates3ObjectUploadSizeHist() {
- if (s3ObjectUploadSizeHist == null) {
- s3ObjectUploadSizeHist = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap());
- }
- return s3ObjectUploadSizeHist == null ? new NoopHistogram() : s3ObjectUploadSizeHist;
- }
-
- public static Histogram getOrCreates3ObjectDownloadSizeHist() {
- if (s3ObjectDownloadSizeHist == null) {
- s3ObjectDownloadSizeHist = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap());
- }
- return s3ObjectDownloadSizeHist == null ? new NoopHistogram() : s3ObjectDownloadSizeHist;
- }
-
- public static Histogram getHistogram(S3ObjectStage stage) {
- Histogram hist = S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> {
- Map tags = Map.of("stage", stage.getName());
- return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_stage_time", tags);
- });
- return hist == null ? new NoopHistogram() : hist;
- }
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java
index 0fde32023..65f5d9f76 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java
@@ -17,7 +17,7 @@
package com.automq.stream.s3.network;
-import com.automq.stream.s3.metrics.stats.NetworkMetricsStats;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Objects;
@@ -78,22 +78,7 @@ public AsyncNetworkBandwidthLimiter(Type type, long tokenSize, int refillInterva
lock.unlock();
}
}, refillIntervalMs, refillIntervalMs, TimeUnit.MILLISECONDS);
- NetworkMetricsStats.registerNetworkInboundAvailableBandwidth(type, () -> {
- lock.lock();
- try {
- return availableTokens;
- } finally {
- lock.unlock();
- }
- });
- NetworkMetricsStats.registerNetworkLimiterQueueSize(type, () -> {
- lock.lock();
- try {
- return queuedCallbacks.size();
- } finally {
- lock.unlock();
- }
- });
+ S3StreamMetricsManager.registerNetworkLimiterGauge(type, this::getAvailableTokens, this::getQueueSize);
}
public void shutdown() {
@@ -114,6 +99,15 @@ public long getAvailableTokens() {
}
}
+ public int getQueueSize() {
+ lock.lock();
+ try {
+ return queuedCallbacks.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
public void forceConsume(long size) {
lock.lock();
try {
@@ -155,9 +149,9 @@ private CompletableFuture consume(int priority, long size) {
private void logMetrics(long size) {
if (type == Type.INBOUND) {
- NetworkMetricsStats.getOrCreateNetworkInboundUsageCounter().inc(size);
+ S3StreamMetricsManager.recordNetworkInboundUsage(size);
} else {
- NetworkMetricsStats.getOrCreateNetworkOutboundUsageCounter().inc(size);
+ S3StreamMetricsManager.recordNetworkOutboundUsage(size);
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
index 741a52666..44deb5276 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
@@ -18,10 +18,9 @@
package com.automq.stream.s3.operator;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
-import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
-import com.automq.stream.s3.metrics.stats.S3ObjectMetricsStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.utils.FutureUtil;
@@ -269,7 +268,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture {
long size = end - start + 1;
- S3ObjectMetricsStats.getOrCreates3ObjectDownloadSizeHist().update(size);
+ S3StreamMetricsManager.recordObjectDownloadSize(size);
CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer();
responsePublisher.subscribe((bytes) -> {
// the aws client will copy DefaultHttpContent to heap ByteBuffer
@@ -279,12 +278,13 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture {
- OperationMetricsStats.getHistogram(S3Operation.GET_OBJECT_FAIL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.GET_OBJECT_FAIL);
if (isUnrecoverable(ex)) {
LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex);
cf.completeExceptionally(ex);
@@ -323,12 +323,13 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) {
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
- OperationMetricsStats.getHistogram(S3Operation.PUT_OBJECT).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordS3UploadSize(objectSize);
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT);
LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(null);
data.release();
}).exceptionally(ex -> {
- OperationMetricsStats.getHistogram(S3Operation.PUT_OBJECT_FAIL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.PUT_OBJECT_FAIL);
if (isUnrecoverable(ex)) {
LOGGER.error("PutObject for object {} fail", path, ex);
cf.completeExceptionally(ex);
@@ -351,10 +352,10 @@ public CompletableFuture delete(String path) {
TimerUtil timerUtil = new TimerUtil();
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
- OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECT).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT);
LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}).exceptionally(ex -> {
- OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECT_FAIL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECT_FAIL);
LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return null;
});
@@ -374,11 +375,11 @@ public CompletableFuture> delete(List objectKeys) {
.build();
// TODO: handle not exist object, should we regard it as deleted or ignore it.
return this.writeS3Client.deleteObjects(request).thenApply(resp -> {
- OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECTS).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS);
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
}).exceptionally(ex -> {
- OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECTS_FAIL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.DELETE_OBJECTS_FAIL);
LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return Collections.emptyList();
});
@@ -399,10 +400,10 @@ void createMultipartUpload0(String path, CompletableFuture cf) {
TimerUtil timerUtil = new TimerUtil();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
- OperationMetricsStats.getHistogram(S3Operation.CREATE_MULTI_PART_UPLOAD).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD);
cf.complete(createMultipartUploadResponse.uploadId());
}).exceptionally(ex -> {
- OperationMetricsStats.getHistogram(S3Operation.CREATE_MULTI_PART_UPLOAD_FAIL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_MULTI_PART_UPLOAD_FAIL);
if (isUnrecoverable(ex)) {
LOGGER.error("CreateMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
@@ -438,16 +439,18 @@ public CompletableFuture uploadPart(String path, String uploadId,
private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf part, CompletableFuture cf) {
TimerUtil timerUtil = new TimerUtil();
+ int size = part.readableBytes();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers());
UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId)
.partNumber(partNumber).build();
CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body);
uploadPartCf.thenAccept(uploadPartResponse -> {
- OperationMetricsStats.getHistogram(S3Operation.UPLOAD_PART).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordS3UploadSize(size);
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART);
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
- OperationMetricsStats.getHistogram(S3Operation.UPLOAD_PART_FAIL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_FAIL);
if (isUnrecoverable(ex)) {
LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
@@ -476,12 +479,12 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> {
- OperationMetricsStats.getHistogram(S3Operation.UPLOAD_PART_COPY).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY);
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber)
.eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
- OperationMetricsStats.getHistogram(S3Operation.UPLOAD_PART_COPY_FAIL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_PART_COPY_FAIL);
if (isUnrecoverable(ex)) {
LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
@@ -510,10 +513,10 @@ public void completeMultipartUpload0(String path, String uploadId, List {
- OperationMetricsStats.getHistogram(S3Operation.COMPLETE_MULTI_PART_UPLOAD).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD);
cf.complete(null);
}).exceptionally(ex -> {
- OperationMetricsStats.getHistogram(S3Operation.COMPLETE_MULTI_PART_UPLOAD_FAIL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.COMPLETE_MULTI_PART_UPLOAD_FAIL);
if (isUnrecoverable(ex)) {
LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
index 45e016387..dc23326e9 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
@@ -18,9 +18,9 @@
package com.automq.stream.s3.operator;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
-import com.automq.stream.s3.metrics.stats.S3ObjectMetricsStats;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.utils.FutureUtil;
import io.netty.buffer.ByteBuf;
@@ -140,14 +140,14 @@ public CompletableFuture close() {
objectPart = null;
}
- S3ObjectMetricsStats.getHistogram(S3ObjectStage.READY_CLOSE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordObjectStageCost(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.READY_CLOSE);
closeCf = new CompletableFuture<>();
CompletableFuture uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0])));
FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf);
closeCf.whenComplete((nil, ex) -> {
- S3ObjectMetricsStats.getHistogram(S3ObjectStage.TOTAL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
- S3ObjectMetricsStats.getOrCreateS3ObjectCounter().inc();
- S3ObjectMetricsStats.getOrCreates3ObjectUploadSizeHist().update(totalWriteSize.get());
+ S3StreamMetricsManager.recordObjectStageCost(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.TOTAL);
+ S3StreamMetricsManager.recordObjectNum(1);
+ S3StreamMetricsManager.recordObjectUploadSize(totalWriteSize.get());
});
return closeCf;
}
@@ -224,7 +224,7 @@ public void upload() {
private void upload0() {
TimerUtil timerUtil = new TimerUtil();
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf);
- partCf.whenComplete((nil, ex) -> S3ObjectMetricsStats.getHistogram(S3ObjectStage.UPLOAD_PART).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
+ S3StreamMetricsManager.recordObjectStageCost(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3ObjectStage.UPLOAD_PART);
}
public long size() {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java
index f6425bc4b..60f11910d 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java
@@ -18,10 +18,8 @@
package com.automq.stream.s3.wal;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
-import com.automq.stream.s3.metrics.operations.S3Operation;
-import com.automq.stream.s3.metrics.stats.ByteBufMetricsStats;
-import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.wal.util.WALUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
@@ -113,10 +111,10 @@ public ByteBuf data() {
data = DirectByteBufAlloc.compositeByteBuffer();
for (Supplier supplier : records) {
ByteBuf record = supplier.get();
- ByteBufMetricsStats.getHistogram("wal_record").update(record.readableBytes());
+ S3StreamMetricsManager.recordAllocateByteBufSize(record.readableBytes(), "wal_record");
data.addComponent(true, record);
}
- ByteBufMetricsStats.getHistogram("wal_block").update(data.readableBytes());
+ S3StreamMetricsManager.recordAllocateByteBufSize(data.readableBytes(), "wal_block");
return data;
}
@@ -127,6 +125,6 @@ public long size() {
@Override
public void polled() {
- OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL_BLOCK_POLLED).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "block_polled");
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
index f6c9d7ec7..9b2e87a96 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
@@ -19,9 +19,9 @@
import com.automq.stream.s3.Config;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
-import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.wal.util.WALChannel;
import com.automq.stream.s3.wal.util.WALUtil;
import com.automq.stream.utils.ThreadUtils;
@@ -367,11 +367,12 @@ public WALMetadata metadata() {
@Override
public AppendResult append(ByteBuf buf, int crc) throws OverCapacityException {
+ TimerUtil timerUtil = new TimerUtil();
try {
return append0(buf, crc);
} catch (OverCapacityException ex) {
buf.release();
- OperationMetricsStats.getCounter(S3Operation.APPEND_STORAGE_WAL_FULL).inc();
+ S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_WAL_FULL);
throw ex;
}
}
@@ -402,8 +403,9 @@ private AppendResult append0(ByteBuf body, int crc) throws OverCapacityException
slidingWindowService.tryWriteBlock();
final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture);
- appendResult.future().whenComplete((nil, ex) -> OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
- OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL_BEFORE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ appendResult.future().whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(
+ timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_WAL));
+ S3StreamMetricsManager.recordAppendWALLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), "before");
return appendResult;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java
index dc65f97a6..d96083847 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java
@@ -18,9 +18,8 @@
package com.automq.stream.s3.wal;
import com.automq.stream.s3.DirectByteBufAlloc;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
-import com.automq.stream.s3.metrics.operations.S3Operation;
-import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.wal.util.WALChannel;
import com.automq.stream.s3.wal.util.WALUtil;
import com.automq.stream.utils.FutureUtil;
@@ -343,7 +342,7 @@ private void writeBlockData(BlockBatch blocks) throws IOException {
walChannel.write(block.data(), position);
}
walChannel.flush();
- OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL_WRITE).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "write");
}
private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException, OverCapacityException {
@@ -532,7 +531,7 @@ public WriteBlockProcessor(BlockBatch blocks) {
@Override
public void run() {
- OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL_AWAIT).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "await");
writeBlock(this.blocks);
}
@@ -556,7 +555,7 @@ public String toString() {
return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}';
}
});
- OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL_AFTER).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
+ S3StreamMetricsManager.recordAppendWALLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), "after");
} catch (Exception e) {
FutureUtil.completeExceptionally(blocks.futures(), e);
LOGGER.error(String.format("failed to write blocks, startOffset: %s", blocks.startOffset()), e);
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java b/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java
deleted file mode 100644
index ebf181763..000000000
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/BaseStreamMetrics.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.rocketmq.store.metrics;
-
-import io.opentelemetry.api.common.Attributes;
-import io.opentelemetry.api.common.AttributesBuilder;
-import io.opentelemetry.api.metrics.Meter;
-import java.util.Map;
-import java.util.function.Supplier;
-
-public class BaseStreamMetrics {
- private static final String STREAM_METRICS_PREFIX = "rocketmq_stream_";
-
- protected final Meter meter;
- protected final Supplier attributesBuilderSupplier;
- protected final Map tags;
- protected final String metricsName;
-
- protected BaseStreamMetrics(String name, Map tags,
- Meter meter, Supplier attributesBuilderSupplier) {
- this.metricsName = metricsName(name);
- this.tags = tags;
- this.meter = meter;
- this.attributesBuilderSupplier = attributesBuilderSupplier;
- }
-
- protected AttributesBuilder newAttributesBuilder() {
- AttributesBuilder builder;
- if (attributesBuilderSupplier == null) {
- builder = Attributes.builder();
- } else {
- builder = attributesBuilderSupplier.get();
- }
- tags.forEach(builder::put);
- return builder;
- }
-
- protected String metricsName(String name) {
- name = name.toLowerCase();
- return STREAM_METRICS_PREFIX + name;
- }
-}
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StoreMetricsConstant.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StoreMetricsConstant.java
index c93f12f2c..fcf64ab47 100644
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/StoreMetricsConstant.java
+++ b/store/src/main/java/com/automq/rocketmq/store/metrics/StoreMetricsConstant.java
@@ -23,7 +23,7 @@ public class StoreMetricsConstant {
public static final String GAUGE_CONSUMER_QUEUEING_LATENCY = "rocketmq_consumer_queueing_latency";
public static final String GAUGE_CONSUMER_READY_MESSAGES = "rocketmq_consumer_ready_messages";
public static final String COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL = "rocketmq_send_to_dlq_messages_total";
- public static final String HISTOGRAM_STREAM_OPERATION_TIME = "rocketmq_stream_operation_time";
+ public static final String HISTOGRAM_STREAM_OPERATION_TIME = "rocketmq_stream_operation_latency";
public static final String LABEL_TOPIC = "topic";
public static final String LABEL_QUEUE_ID = "queue_id";
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java
deleted file mode 100644
index 78cdb72d2..000000000
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsCounter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.rocketmq.store.metrics;
-
-import com.automq.stream.s3.metrics.Counter;
-import io.opentelemetry.api.common.AttributesBuilder;
-import io.opentelemetry.api.metrics.LongCounter;
-import io.opentelemetry.api.metrics.Meter;
-import java.util.Map;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamMetricsCounter extends BaseStreamMetrics implements Counter {
- protected static final Logger LOGGER = LoggerFactory.getLogger(StreamMetricsCounter.class);
-
- private final LongCounter counter;
-
- public StreamMetricsCounter(String name, Map tags,
- Meter meter, Supplier attributesBuilderSupplier) {
- super(name, tags, meter, attributesBuilderSupplier);
- this.counter = this.meter.counterBuilder(this.metricsName)
- .build();
- }
-
- @Override
- public void inc() {
- inc(1);
- }
-
- @Override
- public void inc(long n) {
- if (n < 0) {
- String tag = tags.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()).collect(Collectors.joining(", "));
- LOGGER.warn("Counter value is negative, name: {}, tag: {}, value: {}", metricsName, tag, n);
- return;
- }
- counter.add(n, newAttributesBuilder().build());
- }
-
- @Override
- public long count() {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java
deleted file mode 100644
index e0c89b37f..000000000
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsGauge.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.rocketmq.store.metrics;
-
-import com.automq.stream.s3.metrics.Gauge;
-import io.opentelemetry.api.common.AttributesBuilder;
-import io.opentelemetry.api.metrics.Meter;
-import java.util.Map;
-import java.util.function.Supplier;
-
-public class StreamMetricsGauge extends BaseStreamMetrics {
-
- public StreamMetricsGauge(String name, Map tags,
- Meter meter, Supplier attributesBuilderSupplier, Gauge gauge) {
- super(name, tags, meter, attributesBuilderSupplier);
- this.meter.gaugeBuilder(this.metricsName)
- .ofLongs()
- .buildWithCallback(measurement -> measurement.record(gauge.value(), newAttributesBuilder().build()));
- }
-}
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java
deleted file mode 100644
index 8e5a794ed..000000000
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsHistogram.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.rocketmq.store.metrics;
-
-import com.automq.stream.s3.metrics.Histogram;
-import io.opentelemetry.api.common.AttributesBuilder;
-import io.opentelemetry.api.metrics.LongHistogram;
-import io.opentelemetry.api.metrics.Meter;
-import java.util.Map;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamMetricsHistogram extends BaseStreamMetrics implements Histogram {
- protected static final Logger LOGGER = LoggerFactory.getLogger(StreamMetricsHistogram.class);
-
- private final LongHistogram histogram;
-
- public StreamMetricsHistogram(String name, Map tags,
- Meter meter, Supplier attributesBuilderSupplier) {
- super(name, tags, meter, attributesBuilderSupplier);
- histogram = this.meter.histogramBuilder(this.metricsName)
- .ofLongs()
- .build();
- }
-
-
- @Override
- public void update(long value) {
- if (value < 0) {
- String tag = tags.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()).collect(Collectors.joining(", "));
- LOGGER.warn("Histogram value is negative, name: {}, tag: {}, value: {}", metricsName, tag, value);
- return;
- }
- histogram.record(value, newAttributesBuilder().build());
- }
-
- @Override
- public long count() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public double mean() {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java
index 7299a2882..7455a3b71 100644
--- a/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java
+++ b/store/src/main/java/com/automq/rocketmq/store/metrics/StreamMetricsManager.java
@@ -18,11 +18,7 @@
package com.automq.rocketmq.store.metrics;
import com.automq.rocketmq.common.MetricsManager;
-import com.automq.stream.s3.metrics.Counter;
-import com.automq.stream.s3.metrics.Gauge;
-import com.automq.stream.s3.metrics.Histogram;
-import com.automq.stream.s3.metrics.NoopS3StreamMetricsGroup;
-import com.automq.stream.s3.metrics.S3StreamMetricsGroup;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.Aggregation;
@@ -33,29 +29,23 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;
-public class StreamMetricsManager implements MetricsManager, S3StreamMetricsGroup {
-
- private static Supplier attributesBuilderSupplier;
- private static Meter meter;
- private static NoopS3StreamMetricsGroup noopS3StreamMetricsGroup = new NoopS3StreamMetricsGroup();
-
+public class StreamMetricsManager implements MetricsManager {
+ private static final String STREAM_METRICS_PREFIX = "rocketmq_stream_";
@Override
public void initAttributesBuilder(Supplier attributesBuilderSupplier) {
- StreamMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
+ S3StreamMetricsManager.initAttributesBuilder(attributesBuilderSupplier);
}
@Override
public void initStaticMetrics(Meter meter) {
- StreamMetricsManager.meter = meter;
+ S3StreamMetricsManager.initMetrics(meter, STREAM_METRICS_PREFIX);
}
@Override
public void initDynamicMetrics(Meter meter) {
- StreamMetricsManager.meter = meter;
}
public static List> getMetricsView() {
@@ -91,29 +81,4 @@ public static List> getMetricsView() {
return metricsViewList;
}
-
- @Override
- public Counter newCounter(String name, Map tags) {
- if (meter != null && attributesBuilderSupplier != null) {
- return new StreamMetricsCounter(name, tags, meter, attributesBuilderSupplier);
- }
- return noopS3StreamMetricsGroup.newCounter(name, tags);
- }
-
- @Override
- public Histogram newHistogram(String name, Map tags) {
- if (meter != null && attributesBuilderSupplier != null) {
- return new StreamMetricsHistogram(name, tags, meter, attributesBuilderSupplier);
- }
- return noopS3StreamMetricsGroup.newHistogram(name, tags);
- }
-
- @Override
- public void newGauge(String name, Map tags, Gauge gauge) {
- if (meter != null && attributesBuilderSupplier != null) {
- new StreamMetricsGauge(name, tags, meter, attributesBuilderSupplier, gauge);
- return;
- }
- noopS3StreamMetricsGroup.newGauge(name, tags, gauge);
- }
}