diff --git a/s3stream/build.gradle b/s3stream/build.gradle
index 8ffff008d..47e059998 100644
--- a/s3stream/build.gradle
+++ b/s3stream/build.gradle
@@ -48,6 +48,7 @@ dependencies {
api 'org.aspectj:aspectjrt:1.9.20.1'
api 'org.aspectj:aspectjweaver:1.9.20.1'
api 'com.github.jnr:jnr-posix:3.1.19'
+ api 'com.yammer.metrics:metrics-core:2.2.0'
testImplementation 'org.slf4j:slf4j-simple:2.0.9'
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'
testImplementation 'org.mockito:mockito-core:5.5.0'
diff --git a/s3stream/pom.xml b/s3stream/pom.xml
index f28f908c5..27ed9c9e2 100644
--- a/s3stream/pom.xml
+++ b/s3stream/pom.xml
@@ -133,6 +133,11 @@
jnr-posix
3.1.19
+
+ com.yammer.metrics
+ metrics-core
+ 2.2.0
+
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
index 8c1dc718a..11ab16d80 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
@@ -19,7 +19,6 @@
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metadata.StreamMetadata;
-import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
@@ -334,7 +333,7 @@ public CompletableFuture append(AppendContext context, StreamRecordBatch s
append0(context, writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
- StorageOperationStats.getInstance().appendStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().appendStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
});
return cf;
}
@@ -354,7 +353,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f
if (!fromBackoff) {
backoffRecords.offer(request);
}
- StorageOperationStats.getInstance().appendLogCacheFullStats.record(MetricsLevel.INFO, 0L);
+ StorageOperationStats.getInstance().appendLogCacheFullStats.record(0L);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
@@ -437,7 +436,7 @@ public CompletableFuture read(FetchContext context,
TimerUtil timerUtil = new TimerUtil();
CompletableFuture cf = new CompletableFuture<>();
FutureUtil.propagate(read0(context, streamId, startOffset, endOffset, maxBytes), cf);
- cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().readStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
+ cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().readStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}
@@ -515,7 +514,7 @@ public CompletableFuture forceUpload(long streamId) {
CompletableFuture cf = new CompletableFuture<>();
// Wait for a while to group force upload tasks.
forceUploadTicker.tick().whenComplete((nil, ex) -> {
- StorageOperationStats.getInstance().forceUploadWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().forceUploadWALAwaitStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS));
uploadDeltaWAL(streamId, true);
// Wait for all tasks contains streamId complete.
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.stream()
@@ -525,7 +524,7 @@ public CompletableFuture forceUpload(long streamId) {
callbackSequencer.tryFree(streamId);
}
});
- cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().forceUploadWALCompleteStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)));
+ cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().forceUploadWALCompleteStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}
@@ -557,7 +556,7 @@ private void handleAppendCallback0(WalWriteRequest request) {
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
- StorageOperationStats.getInstance().appendCallbackStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().appendCallbackStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS));
}
private Lock getStreamCallbackLock(long streamId) {
@@ -602,7 +601,7 @@ CompletableFuture uploadDeltaWAL(DeltaWALUploadTaskContext context) {
inflightWALUploadTasks.add(context);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL"));
cf.whenComplete((nil, ex) -> {
- StorageOperationStats.getInstance().uploadWALCompleteStats.record(MetricsLevel.INFO, context.timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().uploadWALCompleteStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS));
inflightWALUploadTasks.remove(context);
if (ex != null) {
LOGGER.error("upload delta WAL fail", ex);
@@ -643,11 +642,11 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) {
private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.prepare().thenAcceptAsync(nil -> {
- StorageOperationStats.getInstance().uploadWALPrepareStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().uploadWALPrepareStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS));
// 1. poll out current task and trigger upload.
DeltaWALUploadTaskContext peek = walPrepareQueue.poll();
Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> StorageOperationStats.getInstance()
- .uploadWALUploadStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS)));
+ .uploadWALUploadStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS)));
// 2. add task to commit queue.
boolean walObjectCommitQueueEmpty = walCommitQueue.isEmpty();
walCommitQueue.add(peek);
@@ -664,7 +663,7 @@ private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {
private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.commit().thenAcceptAsync(nil -> {
- StorageOperationStats.getInstance().uploadWALCommitStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().uploadWALCommitStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS));
// 1. poll out current task
walCommitQueue.poll();
if (context.cache.confirmOffset() != 0) {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
index 0dcf92658..a2890f60a 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
@@ -24,7 +24,6 @@
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
-import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
@@ -144,7 +143,7 @@ public CompletableFuture append(AppendContext context, RecordBatch
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
- StreamOperationStats.getInstance().appendStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ StreamOperationStats.getInstance().appendStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
pendingAppends.remove(cf);
});
return cf;
@@ -190,7 +189,7 @@ public CompletableFuture fetch(FetchContext context,
CompletableFuture cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
- StreamOperationStats.getInstance().fetchStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ StreamOperationStats.getInstance().fetchStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
Throwable cause = FutureUtil.cause(ex);
if (!(cause instanceof FastReadFailFastException)) {
@@ -256,7 +255,7 @@ public CompletableFuture trim(long newStartOffset) {
CompletableFuture cf = new CompletableFuture<>();
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
- cf.whenComplete((nil, ex) -> StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
+ cf.whenComplete((nil, ex) -> StreamOperationStats.getInstance().trimStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}, LOGGER, "trim");
} finally {
@@ -307,10 +306,10 @@ public CompletableFuture close() {
closeCf.whenComplete((nil, ex) -> {
if (ex != null) {
LOGGER.error("{} close fail", logIdent, ex);
- StreamOperationStats.getInstance().closeStreamStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ StreamOperationStats.getInstance().closeStreamStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
} else {
LOGGER.info("{} closed", logIdent);
- StreamOperationStats.getInstance().closeStreamStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ StreamOperationStats.getInstance().closeStreamStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}
});
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
index 78499d4b6..2dee62a96 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
@@ -20,7 +20,6 @@
import com.automq.stream.api.StreamClient;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
-import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
@@ -83,7 +82,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
public CompletableFuture createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
- StreamOperationStats.getInstance().createStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ StreamOperationStats.getInstance().createStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
}
@@ -117,7 +116,7 @@ private CompletableFuture openStream0(long streamId, long epoch) {
metadata.startOffset(), metadata.endOffset(),
storage, streamManager, networkInboundBucket, networkOutboundBucket));
openedStreams.put(streamId, stream);
- StreamOperationStats.getInstance().openStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ StreamOperationStats.getInstance().openStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return stream;
});
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
index 2e83e14e3..6ea9d9f05 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java
@@ -12,7 +12,6 @@
package com.automq.stream.s3.cache;
import com.automq.stream.s3.Config;
-import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
@@ -105,7 +104,7 @@ public CompletableFuture read(TraceContext traceContext,
long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT;
- StorageOperationStats.getInstance().readBlockCacheStats(isCacheHit).record(MetricsLevel.INFO, timeElapsed);
+ StorageOperationStats.getInstance().readBlockCacheStats(isCacheHit).record(timeElapsed);
Span.fromContext(finalTraceContext.currentContext()).setAttribute("cache_hit", isCacheHit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {}",
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
index 8e708d25a..85e2623e2 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
@@ -11,7 +11,6 @@
package com.automq.stream.s3.cache;
-import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
@@ -94,7 +93,7 @@ public boolean put(StreamRecordBatch recordBatch) {
} finally {
readLock.unlock();
}
- StorageOperationStats.getInstance().appendLogCacheStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().appendLogCacheStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return full;
}
@@ -145,7 +144,7 @@ public List get(TraceContext context,
long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = !records.isEmpty() && records.get(0).getBaseOffset() <= startOffset;
- StorageOperationStats.getInstance().readLogCacheStats(isCacheHit).record(MetricsLevel.INFO, timeElapsed);
+ StorageOperationStats.getInstance().readLogCacheStats(isCacheHit).record(timeElapsed);
return records;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
index 4160e04f6..efdeab097 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java
@@ -11,7 +11,6 @@
package com.automq.stream.s3.cache;
-import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.utils.LogContext;
@@ -93,7 +92,7 @@ public void updateReadAheadResult(long readAheadEndOffset, int readAheadSize) {
lock.lock();
this.readAheadEndOffset = readAheadEndOffset;
this.lastReadAheadSize = readAheadSize;
- StorageOperationStats.getInstance().readAheadSizeStats.record(MetricsLevel.INFO, readAheadSize);
+ StorageOperationStats.getInstance().readAheadSizeStats.record(readAheadSize);
if (logger.isDebugEnabled()) {
logger.debug("update read ahead offset {}, size: {}, lastReadOffset: {}", readAheadEndOffset, readAheadSize, lastReadOffset);
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
index 3cea50a19..6f3610a08 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java
@@ -14,7 +14,6 @@
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
-import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
@@ -131,7 +130,7 @@ public CompletableFuture> syncReadAhead(TraceContext tra
completeInflightTask0(key, ex);
}
context.taskKeySet.clear();
- StorageOperationStats.getInstance().blockCacheReadAheadStats(true).record(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().blockCacheReadAheadStats(true).record(timer.elapsedAs(TimeUnit.NANOSECONDS));
});
}
@@ -168,7 +167,7 @@ CompletableFuture> handleSyncReadAhead(TraceContext trac
CompletableFuture throttleCf = inflightReadThrottle.acquire(traceContext, uuid, totalReserveSize);
return throttleCf.thenComposeAsync(nil -> {
// concurrently read all data blocks
- StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(MetricsLevel.INFO, throttleTimer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(throttleTimer.elapsedAs(TimeUnit.NANOSECONDS));
for (int i = 0; i < streamDataBlocksToRead.size(); i++) {
Pair pair = streamDataBlocksToRead.get(i);
ObjectReader objectReader = pair.getLeft();
@@ -296,7 +295,7 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int
completeInflightTask0(key, ex);
}
context.taskKeySet.clear();
- StorageOperationStats.getInstance().blockCacheReadAheadStats(false).record(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().blockCacheReadAheadStats(false).record(timer.elapsedAs(TimeUnit.NANOSECONDS));
});
}
@@ -364,7 +363,7 @@ CompletableFuture handleAsyncReadAhead(long streamId, long startOffset, lo
if (reserveResult.reserveSize() > 0) {
TimerUtil throttleTimer = new TimerUtil();
inflightReadThrottle.acquire(TraceContext.DEFAULT, uuid, reserveResult.reserveSize()).thenAcceptAsync(nil -> {
- StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(MetricsLevel.INFO, throttleTimer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(throttleTimer.elapsedAs(TimeUnit.NANOSECONDS));
// read data block
if (context.taskKeySet.contains(taskKey)) {
setInflightReadAheadStatus(taskKey, DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA);
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java
index 2bce6ac7b..5d5dbf460 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java
@@ -69,8 +69,6 @@ public class S3StreamMetricsConstant {
public static final String OPERATION_LATENCY_METRIC_NAME = "operation_latency";
public static final String OBJECT_COUNT_METRIC_NAME = "object_count";
public static final String OBJECT_STAGE_COST_METRIC_NAME = "object_stage_cost";
- public static final String OBJECT_UPLOAD_SIZE_METRIC_NAME = "object_upload_size";
- public static final String OBJECT_DOWNLOAD_SIZE_METRIC_NAME = "object_download_size";
public static final String NETWORK_INBOUND_USAGE_METRIC_NAME = "network_inbound_usage";
public static final String NETWORK_OUTBOUND_USAGE_METRIC_NAME = "network_outbound_usage";
public static final String NETWORK_INBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME = "network_inbound_available_bandwidth";
@@ -80,6 +78,12 @@ public class S3StreamMetricsConstant {
public static final String NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_inbound_limiter_queue_time";
public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_outbound_limiter_queue_time";
public static final String READ_AHEAD_SIZE_METRIC_NAME = "read_ahead_size";
+ public static final String SUM_METRIC_NAME_SUFFIX = "_sum";
+ public static final String COUNT_METRIC_NAME_SUFFIX = "_count";
+ public static final String P50_METRIC_NAME_SUFFIX = "_50p";
+ public static final String P99_METRIC_NAME_SUFFIX = "_99p";
+ public static final String MEAN_METRIC_NAME_SUFFIX = "_mean";
+ public static final String MAX_METRIC_NAME_SUFFIX = "_max";
public static final String WAL_START_OFFSET = "wal_start_offset";
public static final String WAL_TRIMMED_OFFSET = "wal_trimmed_offset";
public static final String DELTA_WAL_CACHE_SIZE = "delta_wal_cache_size";
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
index 5da77fbcf..c50bda24d 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
@@ -17,37 +17,45 @@
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.metrics.wrapper.ConfigListener;
import com.automq.stream.s3.metrics.wrapper.CounterMetric;
-import com.automq.stream.s3.metrics.wrapper.HistogramMetric;
+import com.automq.stream.s3.metrics.wrapper.HistogramInstrument;
+import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
-import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
public class S3StreamMetricsManager {
private static final List BASE_ATTRIBUTES_LISTENERS = new ArrayList<>();
+ public static final MetricsRegistry METRICS_REGISTRY = new MetricsRegistry();
+ public static final List OPERATION_LATENCY_METRICS = new CopyOnWriteArrayList<>();
+ public static final List OBJECT_STAGE_METRICS = new CopyOnWriteArrayList<>();
+ public static final List NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>();
+ public static final List NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>();
+ public static final List READ_AHEAD_SIZE_METRICS = new CopyOnWriteArrayList<>();
+ public static final List READ_AHEAD_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>();
private static LongCounter s3DownloadSizeInTotal = new NoopLongCounter();
private static LongCounter s3UploadSizeInTotal = new NoopLongCounter();
- private static LongHistogram operationLatency = new NoopLongHistogram();
+ private static HistogramInstrument operationLatency;
private static LongCounter objectNumInTotal = new NoopLongCounter();
- private static LongHistogram objectStageCost = new NoopLongHistogram();
- private static LongHistogram objectUploadSize = new NoopLongHistogram();
- private static LongHistogram objectDownloadSize = new NoopLongHistogram();
+ private static HistogramInstrument objectStageCost;
private static LongCounter networkInboundUsageInTotal = new NoopLongCounter();
private static LongCounter networkOutboundUsageInTotal = new NoopLongCounter();
private static ObservableLongGauge networkInboundAvailableBandwidth = new NoopObservableLongGauge();
private static ObservableLongGauge networkOutboundAvailableBandwidth = new NoopObservableLongGauge();
private static ObservableLongGauge networkInboundLimiterQueueSize = new NoopObservableLongGauge();
private static ObservableLongGauge networkOutboundLimiterQueueSize = new NoopObservableLongGauge();
- private static LongHistogram networkInboundLimiterQueueTime = new NoopLongHistogram();
- private static LongHistogram networkOutboundLimiterQueueTime = new NoopLongHistogram();
- private static LongHistogram readAheadSize = new NoopLongHistogram();
- private static LongHistogram readAheadLimierQueueTime = new NoopLongHistogram();
+ private static HistogramInstrument networkInboundLimiterQueueTime;
+ private static HistogramInstrument networkOutboundLimiterQueueTime;
+ private static HistogramInstrument readAheadSize;
+ private static HistogramInstrument readAheadLimierQueueTime;
private static ObservableLongGauge deltaWalStartOffset = new NoopObservableLongGauge();
private static ObservableLongGauge deltaWalTrimmedOffset = new NoopObservableLongGauge();
private static ObservableLongGauge deltaWalCacheSize = new NoopObservableLongGauge();
@@ -102,31 +110,13 @@ public static void initMetrics(Meter meter, String prefix) {
.setDescription("S3 upload size")
.setUnit("bytes")
.build();
- operationLatency = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OPERATION_LATENCY_METRIC_NAME)
- .setDescription("Operations latency")
- .setUnit("nanoseconds")
- .ofLongs()
- .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES)
- .build();
+ operationLatency = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OPERATION_LATENCY_METRIC_NAME,
+ "Operation latency", "nanoseconds", () -> OPERATION_LATENCY_METRICS);
objectNumInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.OBJECT_COUNT_METRIC_NAME)
.setDescription("Objects count")
.build();
- objectStageCost = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OBJECT_STAGE_COST_METRIC_NAME)
- .setDescription("Objects stage cost")
- .setUnit("nanoseconds")
- .ofLongs()
- .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES)
- .build();
- objectUploadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OBJECT_UPLOAD_SIZE_METRIC_NAME)
- .setDescription("Objects upload size")
- .setUnit("bytes")
- .ofLongs()
- .build();
- objectDownloadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OBJECT_DOWNLOAD_SIZE_METRIC_NAME)
- .setDescription("Objects download size")
- .setUnit("bytes")
- .ofLongs()
- .build();
+ objectStageCost = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OBJECT_STAGE_COST_METRIC_NAME,
+ "Objects stage cost", "nanoseconds", () -> OBJECT_STAGE_METRICS);
networkInboundUsageInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_USAGE_METRIC_NAME)
.setDescription("Network inbound usage")
.setUnit("bytes")
@@ -169,29 +159,14 @@ public static void initMetrics(Meter meter, String prefix) {
result.record((long) networkOutboundLimiterQueueSizeSupplier.get(), metricsConfig.getBaseAttributes());
}
});
- networkInboundLimiterQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME)
- .setDescription("Network inbound limiter queue time")
- .setUnit("nanoseconds")
- .ofLongs()
- .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES)
- .build();
- networkOutboundLimiterQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME)
- .setDescription("Network outbound limiter queue time")
- .setUnit("nanoseconds")
- .ofLongs()
- .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES)
- .build();
- readAheadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.READ_AHEAD_SIZE_METRIC_NAME)
- .setDescription("Read ahead size")
- .setUnit("bytes")
- .ofLongs()
- .build();
- readAheadLimierQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.READ_AHEAD_QUEUE_TIME_METRIC_NAME)
- .setDescription("Read ahead limiter queue time")
- .setUnit("nanoseconds")
- .ofLongs()
- .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES)
- .build();
+ networkInboundLimiterQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME,
+ "Network inbound limiter queue time", "nanoseconds", () -> NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS);
+ networkOutboundLimiterQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME,
+ "Network outbound limiter queue time", "nanoseconds", () -> NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS);
+ readAheadSize = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.READ_AHEAD_SIZE_METRIC_NAME,
+ "Read ahead size", "bytes", () -> READ_AHEAD_SIZE_METRICS);
+ readAheadLimierQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.READ_AHEAD_QUEUE_TIME_METRIC_NAME,
+ "Read ahead limiter queue time", "nanoseconds", () -> READ_AHEAD_LIMITER_QUEUE_TIME_METRICS);
deltaWalStartOffset = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.WAL_START_OFFSET)
.setDescription("Delta WAL start offset")
.ofLongs()
@@ -353,35 +328,43 @@ public static CounterMetric buildS3DownloadSizeMetric() {
}
}
- public static HistogramMetric buildStageOperationMetric(S3Stage stage) {
+ public static YammerHistogramMetric buildStageOperationMetric(MetricName metricName, MetricsLevel metricsLevel, S3Stage stage) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(stage), operationLatency);
+ YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel,
+ metricsConfig, AttributesUtils.buildAttributes(stage));
BASE_ATTRIBUTES_LISTENERS.add(metric);
+ OPERATION_LATENCY_METRICS.add(metric);
return metric;
}
}
- public static HistogramMetric buildOperationMetric(S3Operation operation) {
+ public static YammerHistogramMetric buildOperationMetric(MetricName metricName, MetricsLevel metricsLevel, S3Operation operation) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(operation), operationLatency);
+ YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel,
+ metricsConfig, AttributesUtils.buildAttributes(operation));
BASE_ATTRIBUTES_LISTENERS.add(metric);
+ OPERATION_LATENCY_METRICS.add(metric);
return metric;
}
}
- public static HistogramMetric buildOperationMetric(S3Operation operation, String status, String sizeLabelName) {
+ public static YammerHistogramMetric buildOperationMetric(MetricName metricName, MetricsLevel metricsLevel,
+ S3Operation operation, String status, String sizeLabelName) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(operation,
- status, sizeLabelName), operationLatency);
+ YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig,
+ AttributesUtils.buildAttributes(operation, status, sizeLabelName));
BASE_ATTRIBUTES_LISTENERS.add(metric);
+ OPERATION_LATENCY_METRICS.add(metric);
return metric;
}
}
- public static HistogramMetric buildOperationMetric(S3Operation operation, String status) {
+ public static YammerHistogramMetric buildOperationMetric(MetricName metricName, MetricsLevel metricsLevel, S3Operation operation, String status) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(operation, status), operationLatency);
+ YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig,
+ AttributesUtils.buildAttributes(operation, status));
BASE_ATTRIBUTES_LISTENERS.add(metric);
+ OPERATION_LATENCY_METRICS.add(metric);
return metric;
}
}
@@ -394,26 +377,21 @@ public static CounterMetric buildObjectNumMetric() {
}
}
- public static HistogramMetric buildObjectStageCostMetric(S3ObjectStage stage) {
- synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(stage), objectStageCost);
- BASE_ATTRIBUTES_LISTENERS.add(metric);
- return metric;
- }
- }
-
- public static HistogramMetric buildObjectUploadSizeMetric() {
+ public static YammerHistogramMetric buildObjectStageCostMetric(MetricName metricName, MetricsLevel metricsLevel, S3ObjectStage stage) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, objectUploadSize);
+ YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig,
+ AttributesUtils.buildAttributes(stage));
BASE_ATTRIBUTES_LISTENERS.add(metric);
+ OBJECT_STAGE_METRICS.add(metric);
return metric;
}
}
- public static HistogramMetric buildObjectDownloadSizeMetric() {
+ public static YammerHistogramMetric buildObjectUploadSizeMetric(MetricName metricName, MetricsLevel metricsLevel) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, objectDownloadSize);
+ YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
BASE_ATTRIBUTES_LISTENERS.add(metric);
+ OBJECT_STAGE_METRICS.add(metric);
return metric;
}
}
@@ -434,35 +412,39 @@ public static CounterMetric buildNetworkOutboundUsageMetric() {
}
}
- public static HistogramMetric buildNetworkInboundLimiterQueueTimeMetric() {
+ public static YammerHistogramMetric buildNetworkInboundLimiterQueueTimeMetric(MetricName metricName, MetricsLevel metricsLevel) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, networkInboundLimiterQueueTime);
+ YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
BASE_ATTRIBUTES_LISTENERS.add(metric);
+ NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS.add(metric);
return metric;
}
}
- public static HistogramMetric buildNetworkOutboundLimiterQueueTimeMetric() {
+ public static YammerHistogramMetric buildNetworkOutboundLimiterQueueTimeMetric(MetricName metricName, MetricsLevel metricsLevel) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, networkOutboundLimiterQueueTime);
+ YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
BASE_ATTRIBUTES_LISTENERS.add(metric);
+ NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS.add(metric);
return metric;
}
}
- public static HistogramMetric buildReadAheadSizeMetric() {
+ public static YammerHistogramMetric buildReadAheadSizeMetric(MetricName metricName, MetricsLevel metricsLevel) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, readAheadSize);
+ YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
BASE_ATTRIBUTES_LISTENERS.add(metric);
+ READ_AHEAD_SIZE_METRICS.add(metric);
return metric;
}
}
- public static HistogramMetric buildReadAheadLimiterQueueTimeMetric() {
+ public static YammerHistogramMetric buildReadAheadLimiterQueueTimeMetric(MetricName metricName, MetricsLevel metricsLevel) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
- HistogramMetric metric = new HistogramMetric(metricsConfig, readAheadLimierQueueTime);
+ YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
BASE_ATTRIBUTES_LISTENERS.add(metric);
+ READ_AHEAD_LIMITER_QUEUE_TIME_METRICS.add(metric);
return metric;
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java
index 5601cf49e..22270feb0 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java
@@ -28,4 +28,8 @@ public enum S3ObjectStage {
public String getName() {
return name;
}
+
+ public String getUniqueKey() {
+ return "s3_object_stage_" + name;
+ }
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java
index bd4ac18ac..f5608e4c5 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java
@@ -67,7 +67,7 @@ public enum S3Operation {
S3Operation(S3MetricsType type, String name) {
this.type = type;
this.name = name;
- uniqueKey = type + "-" + name;
+ uniqueKey = type.getName() + "-" + name;
}
public String getName() {
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java
index 42d50ed44..34aab3977 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java
@@ -50,6 +50,10 @@ public String getName() {
return name;
}
+ public String getUniqueKey() {
+ return operation.getUniqueKey() + "-" + name;
+ }
+
@Override
public String toString() {
return "S3Stage{" +
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java
index 53e8ee4e8..ef2d07313 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java
@@ -11,18 +11,22 @@
package com.automq.stream.s3.metrics.stats;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.wrapper.CounterMetric;
-import com.automq.stream.s3.metrics.wrapper.HistogramMetric;
+import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
+import com.yammer.metrics.core.MetricName;
public class NetworkStats {
private volatile static NetworkStats instance = null;
private final CounterMetric networkInboundUsageStats = S3StreamMetricsManager.buildNetworkInboundUsageMetric();
private final CounterMetric networkOutboundUsageStats = S3StreamMetricsManager.buildNetworkOutboundUsageMetric();
- private final HistogramMetric networkInboundLimiterQueueTimeStats = S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric();
- private final HistogramMetric networkOutboundLimiterQueueTimeStats = S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric();
+ private final YammerHistogramMetric networkInboundLimiterQueueTimeStats = S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(
+ new MetricName(NetworkStats.class, "NetworkInboundLimiterQueueTime"), MetricsLevel.INFO);
+ private final YammerHistogramMetric networkOutboundLimiterQueueTimeStats = S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(
+ new MetricName(NetworkStats.class, "NetworkOutboundLimiterQueueTime"), MetricsLevel.INFO);
private NetworkStats() {
}
@@ -42,7 +46,7 @@ public CounterMetric networkUsageStats(AsyncNetworkBandwidthLimiter.Type type) {
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND ? networkInboundUsageStats : networkOutboundUsageStats;
}
- public HistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type) {
+ public YammerHistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type) {
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND ? networkInboundLimiterQueueTimeStats : networkOutboundLimiterQueueTimeStats;
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectStats.java
index 9ae79e34f..4187a9ccf 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectStats.java
@@ -11,20 +11,25 @@
package com.automq.stream.s3.metrics.stats;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
import com.automq.stream.s3.metrics.wrapper.CounterMetric;
-import com.automq.stream.s3.metrics.wrapper.HistogramMetric;
+import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric;
+import com.yammer.metrics.core.MetricName;
public class S3ObjectStats {
private volatile static S3ObjectStats instance = null;
public final CounterMetric objectNumInTotalStats = S3StreamMetricsManager.buildObjectNumMetric();
- public final HistogramMetric objectStageUploadPartStats = S3StreamMetricsManager.buildObjectStageCostMetric(S3ObjectStage.UPLOAD_PART);
- public final HistogramMetric objectStageReadyCloseStats = S3StreamMetricsManager.buildObjectStageCostMetric(S3ObjectStage.READY_CLOSE);
- public final HistogramMetric objectStageTotalStats = S3StreamMetricsManager.buildObjectStageCostMetric(S3ObjectStage.TOTAL);
- public final HistogramMetric objectUploadSizeStats = S3StreamMetricsManager.buildObjectUploadSizeMetric();
- public final HistogramMetric objectDownloadSizeStats = S3StreamMetricsManager.buildObjectDownloadSizeMetric();
+ public final YammerHistogramMetric objectStageUploadPartStats = S3StreamMetricsManager.buildObjectStageCostMetric(
+ new MetricName(S3ObjectStats.class, S3ObjectStage.UPLOAD_PART.getUniqueKey()), MetricsLevel.DEBUG, S3ObjectStage.UPLOAD_PART);
+ public final YammerHistogramMetric objectStageReadyCloseStats = S3StreamMetricsManager.buildObjectStageCostMetric(
+ new MetricName(S3ObjectStats.class, S3ObjectStage.READY_CLOSE.getUniqueKey()), MetricsLevel.DEBUG, S3ObjectStage.READY_CLOSE);
+ public final YammerHistogramMetric objectStageTotalStats = S3StreamMetricsManager.buildObjectStageCostMetric(
+ new MetricName(S3ObjectStats.class, S3ObjectStage.TOTAL.getUniqueKey()), MetricsLevel.DEBUG, S3ObjectStage.TOTAL);
+ public final YammerHistogramMetric objectUploadSizeStats = S3StreamMetricsManager.buildObjectUploadSizeMetric(
+ new MetricName(S3ObjectStats.class, "ObjectUploadSize"), MetricsLevel.DEBUG);
private S3ObjectStats() {
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java
index 9e8030f22..1d6d8d719 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java
@@ -12,11 +12,13 @@
package com.automq.stream.s3.metrics.stats;
import com.automq.stream.s3.metrics.AttributesUtils;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsConstant;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.wrapper.CounterMetric;
-import com.automq.stream.s3.metrics.wrapper.HistogramMetric;
+import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric;
+import com.yammer.metrics.core.MetricName;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -24,22 +26,42 @@ public class S3OperationStats {
private volatile static S3OperationStats instance = null;
public final CounterMetric uploadSizeTotalStats = S3StreamMetricsManager.buildS3UploadSizeMetric();
public final CounterMetric downloadSizeTotalStats = S3StreamMetricsManager.buildS3DownloadSizeMetric();
- private final Map getObjectSuccessStats = new ConcurrentHashMap<>();
- private final Map getObjectFailedStats = new ConcurrentHashMap<>();
- private final Map putObjectSuccessStats = new ConcurrentHashMap<>();
- private final Map putObjectFailedStats = new ConcurrentHashMap<>();
- private final HistogramMetric deleteObjectSuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.DELETE_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
- private final HistogramMetric deleteObjectFailedStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.DELETE_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
- private final HistogramMetric deleteObjectsSuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.DELETE_OBJECTS, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
- private final HistogramMetric deleteObjectsFailedStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.DELETE_OBJECTS, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
- private final HistogramMetric createMultiPartUploadSuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.CREATE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
- private final HistogramMetric createMultiPartUploadFailedStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.CREATE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
- private final Map uploadPartSuccessStats = new ConcurrentHashMap<>();
- private final Map uploadPartFailedStats = new ConcurrentHashMap<>();
- private final HistogramMetric uploadPartCopySuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.UPLOAD_PART_COPY, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
- private final HistogramMetric uploadPartCopyFailedStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.UPLOAD_PART_COPY, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
- private final HistogramMetric completeMultiPartUploadSuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.COMPLETE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
- private final HistogramMetric completeMultiPartUploadFailedStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.COMPLETE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
+ private final Map getObjectSuccessStats = new ConcurrentHashMap<>();
+ private final Map getObjectFailedStats = new ConcurrentHashMap<>();
+ private final Map putObjectSuccessStats = new ConcurrentHashMap<>();
+ private final Map putObjectFailedStats = new ConcurrentHashMap<>();
+ private final YammerHistogramMetric deleteObjectSuccessStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(S3OperationStats.class, S3Operation.DELETE_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS),
+ MetricsLevel.INFO, S3Operation.DELETE_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
+ private final YammerHistogramMetric deleteObjectFailedStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(S3OperationStats.class, S3Operation.DELETE_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED),
+ MetricsLevel.INFO, S3Operation.DELETE_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
+ private final YammerHistogramMetric deleteObjectsSuccessStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(S3OperationStats.class, S3Operation.DELETE_OBJECTS.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS),
+ MetricsLevel.INFO, S3Operation.DELETE_OBJECTS, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
+ private final YammerHistogramMetric deleteObjectsFailedStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(S3OperationStats.class, S3Operation.DELETE_OBJECTS.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED),
+ MetricsLevel.INFO, S3Operation.DELETE_OBJECTS, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
+ private final YammerHistogramMetric createMultiPartUploadSuccessStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(S3OperationStats.class, S3Operation.CREATE_MULTI_PART_UPLOAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS),
+ MetricsLevel.INFO, S3Operation.CREATE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
+ private final YammerHistogramMetric createMultiPartUploadFailedStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(S3OperationStats.class, S3Operation.CREATE_MULTI_PART_UPLOAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED),
+ MetricsLevel.INFO, S3Operation.CREATE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
+ private final Map uploadPartSuccessStats = new ConcurrentHashMap<>();
+ private final Map uploadPartFailedStats = new ConcurrentHashMap<>();
+ private final YammerHistogramMetric uploadPartCopySuccessStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART_COPY.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS),
+ MetricsLevel.INFO, S3Operation.UPLOAD_PART_COPY, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
+ private final YammerHistogramMetric uploadPartCopyFailedStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART_COPY.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED),
+ MetricsLevel.INFO, S3Operation.UPLOAD_PART_COPY, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
+ private final YammerHistogramMetric completeMultiPartUploadSuccessStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(S3OperationStats.class, S3Operation.COMPLETE_MULTI_PART_UPLOAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS),
+ MetricsLevel.INFO, S3Operation.COMPLETE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
+ private final YammerHistogramMetric completeMultiPartUploadFailedStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(S3OperationStats.class, S3Operation.COMPLETE_MULTI_PART_UPLOAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED),
+ MetricsLevel.INFO, S3Operation.COMPLETE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
private S3OperationStats() {
}
@@ -55,56 +77,68 @@ public static S3OperationStats getInstance() {
return instance;
}
- public HistogramMetric getObjectStats(long size, boolean isSuccess) {
+ public YammerHistogramMetric getObjectStats(long size, boolean isSuccess) {
String label = AttributesUtils.getObjectBucketLabel(size);
if (isSuccess) {
return getObjectSuccessStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric(
- S3Operation.GET_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label));
+ new MetricName(S3OperationStats.class, S3Operation.GET_OBJECT.getUniqueKey()
+ + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS + label),
+ MetricsLevel.INFO, S3Operation.GET_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label));
} else {
return getObjectFailedStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric(
- S3Operation.GET_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label));
+ new MetricName(S3OperationStats.class, S3Operation.GET_OBJECT.getUniqueKey()
+ + S3StreamMetricsConstant.LABEL_STATUS_FAILED + label),
+ MetricsLevel.INFO, S3Operation.GET_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label));
}
}
- public HistogramMetric putObjectStats(long size, boolean isSuccess) {
+ public YammerHistogramMetric putObjectStats(long size, boolean isSuccess) {
String label = AttributesUtils.getObjectBucketLabel(size);
if (isSuccess) {
return putObjectSuccessStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric(
- S3Operation.PUT_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label));
+ new MetricName(S3OperationStats.class, S3Operation.PUT_OBJECT.getUniqueKey()
+ + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS + label),
+ MetricsLevel.INFO, S3Operation.PUT_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label));
} else {
return putObjectFailedStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric(
- S3Operation.PUT_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label));
+ new MetricName(S3OperationStats.class, S3Operation.PUT_OBJECT.getUniqueKey() +
+ S3StreamMetricsConstant.LABEL_STATUS_FAILED + label),
+ MetricsLevel.INFO, S3Operation.PUT_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label));
}
}
- public HistogramMetric uploadPartStats(long size, boolean isSuccess) {
+ public YammerHistogramMetric uploadPartStats(long size, boolean isSuccess) {
String label = AttributesUtils.getObjectBucketLabel(size);
if (isSuccess) {
return uploadPartSuccessStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric(
- S3Operation.UPLOAD_PART, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label));
+ new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART.getUniqueKey() +
+ S3StreamMetricsConstant.LABEL_STATUS_SUCCESS + label),
+ MetricsLevel.INFO, S3Operation.UPLOAD_PART, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label));
} else {
return uploadPartFailedStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric(
- S3Operation.UPLOAD_PART, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label));
+ new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART.getUniqueKey() +
+ S3StreamMetricsConstant.LABEL_STATUS_FAILED + label),
+ MetricsLevel.INFO, S3Operation.UPLOAD_PART, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label));
}
}
- public HistogramMetric deleteObjectStats(boolean isSuccess) {
+ public YammerHistogramMetric deleteObjectStats(boolean isSuccess) {
return isSuccess ? deleteObjectSuccessStats : deleteObjectFailedStats;
}
- public HistogramMetric deleteObjectsStats(boolean isSuccess) {
+ public YammerHistogramMetric deleteObjectsStats(boolean isSuccess) {
return isSuccess ? deleteObjectsSuccessStats : deleteObjectsFailedStats;
}
- public HistogramMetric uploadPartCopyStats(boolean isSuccess) {
+ public YammerHistogramMetric uploadPartCopyStats(boolean isSuccess) {
return isSuccess ? uploadPartCopySuccessStats : uploadPartCopyFailedStats;
}
- public HistogramMetric createMultiPartUploadStats(boolean isSuccess) {
+ public YammerHistogramMetric createMultiPartUploadStats(boolean isSuccess) {
return isSuccess ? createMultiPartUploadSuccessStats : createMultiPartUploadFailedStats;
}
- public HistogramMetric completeMultiPartUploadStats(boolean isSuccess) {
+ public YammerHistogramMetric completeMultiPartUploadStats(boolean isSuccess) {
return isSuccess ? completeMultiPartUploadSuccessStats : completeMultiPartUploadFailedStats;
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java
index 6c8f24959..8e46c592f 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java
@@ -11,41 +11,75 @@
package com.automq.stream.s3.metrics.stats;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsConstant;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.operations.S3Stage;
-import com.automq.stream.s3.metrics.wrapper.HistogramMetric;
+import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric;
+import com.yammer.metrics.core.MetricName;
public class StorageOperationStats {
private volatile static StorageOperationStats instance = null;
- public final HistogramMetric appendStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STORAGE);
- public final HistogramMetric appendWALBeforeStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_BEFORE);
- public final HistogramMetric appendWALBlockPolledStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_BLOCK_POLLED);
- public final HistogramMetric appendWALAwaitStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_AWAIT);
- public final HistogramMetric appendWALWriteStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_WRITE);
- public final HistogramMetric appendWALAfterStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_AFTER);
- public final HistogramMetric appendWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_COMPLETE);
- public final HistogramMetric appendCallbackStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STORAGE_APPEND_CALLBACK);
- public final HistogramMetric appendWALFullStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STORAGE_WAL_FULL);
- public final HistogramMetric appendLogCacheStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STORAGE_LOG_CACHE);
- public final HistogramMetric appendLogCacheFullStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
- public final HistogramMetric uploadWALPrepareStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.UPLOAD_WAL_PREPARE);
- public final HistogramMetric uploadWALUploadStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.UPLOAD_WAL_UPLOAD);
- public final HistogramMetric uploadWALCommitStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.UPLOAD_WAL_COMMIT);
- public final HistogramMetric uploadWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.UPLOAD_WAL_COMPLETE);
- public final HistogramMetric forceUploadWALAwaitStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.FORCE_UPLOAD_WAL_AWAIT);
- public final HistogramMetric forceUploadWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.FORCE_UPLOAD_WAL_COMPLETE);
- public final HistogramMetric readStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.READ_STORAGE);
- private final HistogramMetric readLogCacheHitStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.READ_STORAGE_LOG_CACHE, S3StreamMetricsConstant.LABEL_STATUS_HIT);
- private final HistogramMetric readLogCacheMissStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.READ_STORAGE_LOG_CACHE, S3StreamMetricsConstant.LABEL_STATUS_MISS);
- private final HistogramMetric readBlockCacheHitStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.READ_STORAGE_BLOCK_CACHE, S3StreamMetricsConstant.LABEL_STATUS_HIT);
- private final HistogramMetric readBlockCacheMissStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.READ_STORAGE_BLOCK_CACHE, S3StreamMetricsConstant.LABEL_STATUS_MISS);
- private final HistogramMetric blockCacheReadAheadSyncStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.BLOCK_CACHE_READ_AHEAD, S3StreamMetricsConstant.LABEL_STATUS_SYNC);
- private final HistogramMetric blockCacheReadAheadAsyncStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.BLOCK_CACHE_READ_AHEAD, S3StreamMetricsConstant.LABEL_STATUS_ASYNC);
- public final HistogramMetric readAheadSizeStats = S3StreamMetricsManager.buildReadAheadSizeMetric();
- public final HistogramMetric readAheadLimiterQueueTimeStats = S3StreamMetricsManager.buildReadAheadLimiterQueueTimeMetric();
+ public final YammerHistogramMetric appendStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.APPEND_STORAGE.getUniqueKey()), MetricsLevel.INFO, S3Operation.APPEND_STORAGE);
+ public final YammerHistogramMetric appendWALBeforeStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_BEFORE.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.APPEND_WAL_BEFORE);
+ public final YammerHistogramMetric appendWALBlockPolledStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_BLOCK_POLLED.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.APPEND_WAL_BLOCK_POLLED);
+ public final YammerHistogramMetric appendWALAwaitStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_AWAIT.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.APPEND_WAL_AWAIT);
+ public final YammerHistogramMetric appendWALWriteStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_WRITE.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.APPEND_WAL_WRITE);
+ public final YammerHistogramMetric appendWALAfterStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_AFTER.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.APPEND_WAL_AFTER);
+ public final YammerHistogramMetric appendWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_COMPLETE.getUniqueKey()), MetricsLevel.INFO, S3Stage.APPEND_WAL_COMPLETE);
+ public final YammerHistogramMetric appendCallbackStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.APPEND_STORAGE_APPEND_CALLBACK.getUniqueKey()), MetricsLevel.DEBUG, S3Operation.APPEND_STORAGE_APPEND_CALLBACK);
+ public final YammerHistogramMetric appendWALFullStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.APPEND_STORAGE_WAL_FULL.getUniqueKey()), MetricsLevel.INFO, S3Operation.APPEND_STORAGE_WAL_FULL);
+ public final YammerHistogramMetric appendLogCacheStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.APPEND_STORAGE_LOG_CACHE.getUniqueKey()), MetricsLevel.INFO, S3Operation.APPEND_STORAGE_LOG_CACHE);
+ public final YammerHistogramMetric appendLogCacheFullStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL.getUniqueKey()), MetricsLevel.INFO, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
+ public final YammerHistogramMetric uploadWALPrepareStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.UPLOAD_WAL_PREPARE.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.UPLOAD_WAL_PREPARE);
+ public final YammerHistogramMetric uploadWALUploadStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.UPLOAD_WAL_UPLOAD.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.UPLOAD_WAL_UPLOAD);
+ public final YammerHistogramMetric uploadWALCommitStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.UPLOAD_WAL_COMMIT.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.UPLOAD_WAL_COMMIT);
+ public final YammerHistogramMetric uploadWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.UPLOAD_WAL_COMPLETE.getUniqueKey()), MetricsLevel.INFO, S3Stage.UPLOAD_WAL_COMPLETE);
+ public final YammerHistogramMetric forceUploadWALAwaitStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.FORCE_UPLOAD_WAL_AWAIT.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.FORCE_UPLOAD_WAL_AWAIT);
+ public final YammerHistogramMetric forceUploadWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Stage.FORCE_UPLOAD_WAL_COMPLETE.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.FORCE_UPLOAD_WAL_COMPLETE);
+ public final YammerHistogramMetric readStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.READ_STORAGE.getUniqueKey()), MetricsLevel.INFO, S3Operation.READ_STORAGE);
+ private final YammerHistogramMetric readLogCacheHitStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.READ_STORAGE_LOG_CACHE.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_HIT),
+ MetricsLevel.INFO, S3Operation.READ_STORAGE_LOG_CACHE, S3StreamMetricsConstant.LABEL_STATUS_HIT);
+ private final YammerHistogramMetric readLogCacheMissStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.READ_STORAGE_LOG_CACHE.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_MISS),
+ MetricsLevel.INFO, S3Operation.READ_STORAGE_LOG_CACHE, S3StreamMetricsConstant.LABEL_STATUS_MISS);
+ private final YammerHistogramMetric readBlockCacheHitStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.READ_STORAGE_BLOCK_CACHE.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_HIT),
+ MetricsLevel.INFO, S3Operation.READ_STORAGE_BLOCK_CACHE, S3StreamMetricsConstant.LABEL_STATUS_HIT);
+ private final YammerHistogramMetric readBlockCacheMissStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.READ_STORAGE_BLOCK_CACHE.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_MISS),
+ MetricsLevel.INFO, S3Operation.READ_STORAGE_BLOCK_CACHE, S3StreamMetricsConstant.LABEL_STATUS_MISS);
+ private final YammerHistogramMetric blockCacheReadAheadSyncStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.BLOCK_CACHE_READ_AHEAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SYNC),
+ MetricsLevel.INFO, S3Operation.BLOCK_CACHE_READ_AHEAD, S3StreamMetricsConstant.LABEL_STATUS_SYNC);
+ private final YammerHistogramMetric blockCacheReadAheadAsyncStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StorageOperationStats.class, S3Operation.BLOCK_CACHE_READ_AHEAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_ASYNC),
+ MetricsLevel.INFO, S3Operation.BLOCK_CACHE_READ_AHEAD, S3StreamMetricsConstant.LABEL_STATUS_ASYNC);
+ public final YammerHistogramMetric readAheadSizeStats = S3StreamMetricsManager.buildReadAheadSizeMetric(
+ new MetricName(StorageOperationStats.class, "ReadAheadSize"), MetricsLevel.INFO);
+ public final YammerHistogramMetric readAheadLimiterQueueTimeStats = S3StreamMetricsManager.buildReadAheadLimiterQueueTimeMetric(
+ new MetricName(StorageOperationStats.class, "ReadAheadLimitQueueTime"), MetricsLevel.INFO);
private StorageOperationStats() {
}
@@ -61,15 +95,15 @@ public static StorageOperationStats getInstance() {
return instance;
}
- public HistogramMetric readLogCacheStats(boolean isCacheHit) {
+ public YammerHistogramMetric readLogCacheStats(boolean isCacheHit) {
return isCacheHit ? readLogCacheHitStats : readLogCacheMissStats;
}
- public HistogramMetric readBlockCacheStats(boolean isCacheHit) {
+ public YammerHistogramMetric readBlockCacheStats(boolean isCacheHit) {
return isCacheHit ? readBlockCacheHitStats : readBlockCacheMissStats;
}
- public HistogramMetric blockCacheReadAheadStats(boolean isSync) {
+ public YammerHistogramMetric blockCacheReadAheadStats(boolean isSync) {
return isSync ? blockCacheReadAheadSyncStats : blockCacheReadAheadAsyncStats;
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StreamOperationStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StreamOperationStats.java
index f045e9ada..f2f168b2c 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StreamOperationStats.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StreamOperationStats.java
@@ -11,20 +11,31 @@
package com.automq.stream.s3.metrics.stats;
+import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsConstant;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.operations.S3Operation;
-import com.automq.stream.s3.metrics.wrapper.HistogramMetric;
+import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric;
+import com.yammer.metrics.core.MetricName;
public class StreamOperationStats {
private volatile static StreamOperationStats instance = null;
- public final HistogramMetric createStreamStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.CREATE_STREAM);
- public final HistogramMetric openStreamStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.OPEN_STREAM);
- public final HistogramMetric appendStreamStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STREAM);
- public final HistogramMetric fetchStreamStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.FETCH_STREAM);
- public final HistogramMetric trimStreamStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.TRIM_STREAM);
- private final HistogramMetric closeStreamSuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.CLOSE_STREAM, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
- private final HistogramMetric closeStreamFailStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.CLOSE_STREAM, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
+ public final YammerHistogramMetric createStreamStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StreamOperationStats.class, S3Operation.CREATE_STREAM.getUniqueKey()), MetricsLevel.INFO, S3Operation.CREATE_STREAM);
+ public final YammerHistogramMetric openStreamStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StreamOperationStats.class, S3Operation.OPEN_STREAM.getUniqueKey()), MetricsLevel.INFO, S3Operation.OPEN_STREAM);
+ public final YammerHistogramMetric appendStreamStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StreamOperationStats.class, S3Operation.APPEND_STREAM.getUniqueKey()), MetricsLevel.INFO, S3Operation.APPEND_STREAM);
+ public final YammerHistogramMetric fetchStreamStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StreamOperationStats.class, S3Operation.FETCH_STREAM.getUniqueKey()), MetricsLevel.INFO, S3Operation.FETCH_STREAM);
+ public final YammerHistogramMetric trimStreamStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StreamOperationStats.class, S3Operation.TRIM_STREAM.getUniqueKey()), MetricsLevel.INFO, S3Operation.TRIM_STREAM);
+ private final YammerHistogramMetric closeStreamSuccessStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StreamOperationStats.class, S3Operation.CLOSE_STREAM.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS),
+ MetricsLevel.INFO, S3Operation.CLOSE_STREAM, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS);
+ private final YammerHistogramMetric closeStreamFailStats = S3StreamMetricsManager.buildOperationMetric(
+ new MetricName(StreamOperationStats.class, S3Operation.CLOSE_STREAM.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED),
+ MetricsLevel.INFO, S3Operation.CLOSE_STREAM, S3StreamMetricsConstant.LABEL_STATUS_FAILED);
private StreamOperationStats() {
}
@@ -40,7 +51,7 @@ public static StreamOperationStats getInstance() {
return instance;
}
- public HistogramMetric closeStreamStats(boolean isSuccess) {
+ public YammerHistogramMetric closeStreamStats(boolean isSuccess) {
return isSuccess ? closeStreamSuccessStats : closeStreamFailStats;
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java
new file mode 100644
index 000000000..2b9066020
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.automq.stream.s3.metrics.wrapper;
+
+import com.automq.stream.s3.metrics.S3StreamMetricsConstant;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class HistogramInstrument {
+ private final ObservableLongGauge count;
+ private final ObservableLongGauge sum;
+ private final ObservableDoubleGauge histP50Value;
+ private final ObservableDoubleGauge histP99Value;
+ private final ObservableDoubleGauge histMeanValue;
+ private final ObservableDoubleGauge histMaxValue;
+
+ public HistogramInstrument(Meter meter, String name, String desc, String unit, Supplier> histogramsSupplier) {
+ this.count = meter.gaugeBuilder(name + S3StreamMetricsConstant.COUNT_METRIC_NAME_SUFFIX)
+ .setDescription(desc + " (count)")
+ .ofLongs()
+ .buildWithCallback(result -> {
+ List histograms = histogramsSupplier.get();
+ histograms.forEach(histogram -> {
+ if (histogram.shouldRecord()) {
+ result.record(histogram.count(), histogram.attributes);
+ }
+ });
+ });
+ this.sum = meter.gaugeBuilder(name + S3StreamMetricsConstant.SUM_METRIC_NAME_SUFFIX)
+ .setDescription(desc + " (sum)")
+ .ofLongs()
+ .setUnit(unit)
+ .buildWithCallback(result -> {
+ List histograms = histogramsSupplier.get();
+ histograms.forEach(histogram -> {
+ if (histogram.shouldRecord()) {
+ result.record(histogram.sum(), histogram.attributes);
+ }
+ });
+ });
+ this.histP50Value = meter.gaugeBuilder(name + S3StreamMetricsConstant.P50_METRIC_NAME_SUFFIX)
+ .setDescription(desc + " (50th percentile)")
+ .setUnit(unit)
+ .buildWithCallback(result -> {
+ List histograms = histogramsSupplier.get();
+ histograms.forEach(histogram -> {
+ if (histogram.shouldRecord()) {
+ result.record(histogram.p50(), histogram.attributes);
+ }
+ });
+ });
+ this.histP99Value = meter.gaugeBuilder(name + S3StreamMetricsConstant.P99_METRIC_NAME_SUFFIX)
+ .setDescription(desc + " (99th percentile)")
+ .setUnit(unit)
+ .buildWithCallback(result -> {
+ List histograms = histogramsSupplier.get();
+ histograms.forEach(histogram -> {
+ if (histogram.shouldRecord()) {
+ result.record(histogram.p99(), histogram.attributes);
+ }
+ });
+ });
+ this.histMeanValue = meter.gaugeBuilder(name + S3StreamMetricsConstant.MEAN_METRIC_NAME_SUFFIX)
+ .setDescription(desc + " (mean)")
+ .setUnit(unit)
+ .buildWithCallback(result -> {
+ List histograms = histogramsSupplier.get();
+ histograms.forEach(histogram -> {
+ if (histogram.shouldRecord()) {
+ result.record(histogram.mean(), histogram.attributes);
+ }
+ });
+ });
+ this.histMaxValue = meter.gaugeBuilder(name + S3StreamMetricsConstant.MAX_METRIC_NAME_SUFFIX)
+ .setDescription(desc + " (max)")
+ .setUnit(unit)
+ .buildWithCallback(result -> {
+ List histograms = histogramsSupplier.get();
+ histograms.forEach(histogram -> {
+ if (histogram.shouldRecord()) {
+ result.record(histogram.max(), histogram.attributes);
+ }
+ });
+ });
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java
deleted file mode 100644
index b923a8839..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2024, AutoMQ CO.,LTD.
- *
- * Use of this software is governed by the Business Source License
- * included in the file BSL.md
- *
- * As of the Change Date specified in that file, in accordance with
- * the Business Source License, use of this software will be governed
- * by the Apache License, Version 2.0
- */
-
-package com.automq.stream.s3.metrics.wrapper;
-
-import com.automq.stream.s3.metrics.MetricsConfig;
-import com.automq.stream.s3.metrics.MetricsLevel;
-import io.opentelemetry.api.common.Attributes;
-import io.opentelemetry.api.metrics.LongHistogram;
-
-public class HistogramMetric extends ConfigurableMetrics {
- private final LongHistogram longHistogram;
-
- public HistogramMetric(MetricsConfig metricsConfig, LongHistogram longHistogram) {
- this(metricsConfig, Attributes.empty(), longHistogram);
- }
-
- public HistogramMetric(MetricsConfig metricsConfig, Attributes extraAttributes, LongHistogram longHistogram) {
- super(metricsConfig, extraAttributes);
- this.longHistogram = longHistogram;
- }
-
- public boolean record(MetricsLevel metricsLevel, long value) {
- if (metricsLevel.isWithin(this.metricsLevel)) {
- longHistogram.record(value, attributes);
- return true;
- }
- return false;
- }
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/YammerHistogramMetric.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/YammerHistogramMetric.java
new file mode 100644
index 000000000..a7b62d50d
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/YammerHistogramMetric.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2024, AutoMQ CO.,LTD.
+ *
+ * Use of this software is governed by the Business Source License
+ * included in the file BSL.md
+ *
+ * As of the Change Date specified in that file, in accordance with
+ * the Business Source License, use of this software will be governed
+ * by the Apache License, Version 2.0
+ */
+
+package com.automq.stream.s3.metrics.wrapper;
+
+import com.automq.stream.s3.metrics.MetricsConfig;
+import com.automq.stream.s3.metrics.MetricsLevel;
+import com.automq.stream.s3.metrics.S3StreamMetricsManager;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricName;
+import io.opentelemetry.api.common.Attributes;
+
+public class YammerHistogramMetric extends ConfigurableMetrics {
+ private final Histogram histogram;
+ private final MetricsLevel currentMetricsLevel;
+
+ public YammerHistogramMetric(MetricName metricName, MetricsLevel currentMetricsLevel, MetricsConfig metricsConfig) {
+ this(metricName, currentMetricsLevel, metricsConfig, Attributes.empty());
+ }
+
+ public YammerHistogramMetric(MetricName metricName, MetricsLevel currentMetricsLevel, MetricsConfig metricsConfig, Attributes extraAttributes) {
+ super(metricsConfig, extraAttributes);
+ this.histogram = S3StreamMetricsManager.METRICS_REGISTRY.newHistogram(metricName, true);
+ this.currentMetricsLevel = currentMetricsLevel;
+ }
+
+ public long count() {
+ return histogram.count();
+ }
+
+ public long sum() {
+ return (long) histogram.sum();
+ }
+
+ public double p50() {
+ return histogram.getSnapshot().getMedian();
+ }
+
+ public double p99() {
+ return histogram.getSnapshot().get99thPercentile();
+ }
+
+ public double mean() {
+ return histogram.mean();
+ }
+
+ public double max() {
+ return histogram.max();
+ }
+
+ public void record(long value) {
+ histogram.update(value);
+ }
+
+ public boolean shouldRecord() {
+ return currentMetricsLevel.isWithin(metricsLevel);
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
index 3ee10188f..43c274d3b 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
@@ -16,7 +16,6 @@
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.NetworkStats;
-import com.automq.stream.s3.metrics.stats.S3ObjectStats;
import com.automq.stream.s3.metrics.stats.S3OperationStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.ThrottleStrategy;
@@ -210,7 +209,7 @@ public CompletableFuture rangeRead(String path, long start, long end, T
TimerUtil timerUtil = new TimerUtil();
networkInboundBandwidthLimiter.consume(throttleStrategy, end - start).whenCompleteAsync((v, ex) -> {
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND)
- .record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ .record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
cf.completeExceptionally(ex);
} else {
@@ -311,12 +310,11 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture mergedRangeRead0(path, start, end, cf), 100, TimeUnit.MILLISECONDS);
}
- S3OperationStats.getInstance().getObjectStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().getObjectStats(size, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
};
readS3Client.getObject(request, AsyncResponseTransformer.toPublisher())
.thenAccept(responsePublisher -> {
- S3ObjectStats.getInstance().objectDownloadSizeStats.record(MetricsLevel.INFO, size);
CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer();
responsePublisher.subscribe((bytes) -> {
// the aws client will copy DefaultHttpContent to heap ByteBuffer
@@ -327,7 +325,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture {
buf.release();
@@ -352,7 +350,7 @@ public CompletableFuture write(String path, ByteBuf data, ThrottleStrategy
TimerUtil timerUtil = new TimerUtil();
networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> {
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND)
- .record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ .record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
cf.completeExceptionally(ex);
} else {
@@ -372,12 +370,12 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) {
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize);
- S3OperationStats.getInstance().putObjectStats(objectSize, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().putObjectStats(objectSize, true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
data.release();
cf.complete(null);
}).exceptionally(ex -> {
- S3OperationStats.getInstance().putObjectStats(objectSize, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().putObjectStats(objectSize, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("PutObject for object {} fail", path, ex);
cf.completeExceptionally(ex);
@@ -400,10 +398,10 @@ public CompletableFuture delete(String path) {
TimerUtil timerUtil = new TimerUtil();
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
- S3OperationStats.getInstance().deleteObjectStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().deleteObjectStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}).exceptionally(ex -> {
- S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().deleteObjectsStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return null;
});
@@ -423,11 +421,11 @@ public CompletableFuture> delete(List objectKeys) {
.build();
// TODO: handle not exist object, should we regard it as deleted or ignore it.
return this.writeS3Client.deleteObjects(request).thenApply(resp -> {
- S3OperationStats.getInstance().deleteObjectsStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().deleteObjectsStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
}).exceptionally(ex -> {
- S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().deleteObjectsStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return Collections.emptyList();
});
@@ -448,10 +446,10 @@ void createMultipartUpload0(String path, CompletableFuture cf) {
TimerUtil timerUtil = new TimerUtil();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
- S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().createMultiPartUploadStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(createMultipartUploadResponse.uploadId());
}).exceptionally(ex -> {
- S3OperationStats.getInstance().createMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().createMultiPartUploadStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("CreateMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
@@ -495,12 +493,12 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body);
uploadPartCf.thenAccept(uploadPartResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size);
- S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().uploadPartStats(size, true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
part.release();
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
- S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().uploadPartStats(size, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex);
part.release();
@@ -532,12 +530,12 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> {
- S3OperationStats.getInstance().uploadPartCopyStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().uploadPartCopyStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber)
.eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
- S3OperationStats.getInstance().uploadPartCopyStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().uploadPartCopyStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
@@ -567,10 +565,10 @@ public void completeMultipartUpload0(String path, String uploadId, List {
- S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(null);
}).exceptionally(ex -> {
- S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
index bd76c9e7e..76103c951 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java
@@ -148,14 +148,14 @@ public CompletableFuture close() {
objectPart = null;
}
- S3ObjectStats.getInstance().objectStageReadyCloseStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3ObjectStats.getInstance().objectStageReadyCloseStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
closeCf = new CompletableFuture<>();
CompletableFuture uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0])));
FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf);
closeCf.whenComplete((nil, ex) -> {
- S3ObjectStats.getInstance().objectStageTotalStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3ObjectStats.getInstance().objectStageTotalStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3ObjectStats.getInstance().objectNumInTotalStats.add(MetricsLevel.DEBUG, 1);
- S3ObjectStats.getInstance().objectUploadSizeStats.record(MetricsLevel.DEBUG, totalWriteSize.get());
+ S3ObjectStats.getInstance().objectUploadSizeStats.record(totalWriteSize.get());
});
return closeCf;
}
@@ -233,7 +233,7 @@ private void upload0() {
TimerUtil timerUtil = new TimerUtil();
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf);
partCf.whenComplete((nil, ex) -> {
- S3ObjectStats.getInstance().objectStageUploadPartStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3ObjectStats.getInstance().objectStageUploadPartStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
});
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java
index f074e2935..8821586a3 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java
@@ -152,13 +152,13 @@ public boolean hasBatchingPart() {
@Override
public CompletableFuture close() {
- S3ObjectStats.getInstance().objectStageReadyCloseStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3ObjectStats.getInstance().objectStageReadyCloseStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
int size = data.readableBytes();
FutureUtil.propagate(operator.write(path, data, throttleStrategy), cf);
cf.whenComplete((nil, e) -> {
- S3ObjectStats.getInstance().objectStageTotalStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ S3ObjectStats.getInstance().objectStageTotalStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3ObjectStats.getInstance().objectNumInTotalStats.add(MetricsLevel.DEBUG, 1);
- S3ObjectStats.getInstance().objectUploadSizeStats.record(MetricsLevel.DEBUG, size);
+ S3ObjectStats.getInstance().objectUploadSizeStats.record(size);
});
return cf;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java
index ecee701a0..c5d2c7ec8 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java
@@ -12,7 +12,6 @@
package com.automq.stream.s3.wal;
import com.automq.stream.s3.ByteBufAlloc;
-import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.wal.util.WALUtil;
@@ -118,6 +117,6 @@ public long size() {
@Override
public void polled() {
- StorageOperationStats.getInstance().appendWALBlockPolledStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().appendWALBlockPolledStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS));
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
index 6dad28e1f..697e249b4 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java
@@ -13,7 +13,6 @@
import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.Config;
-import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
@@ -375,7 +374,7 @@ public AppendResult append(TraceContext context, ByteBuf buf, int crc) throws Ov
return result;
} catch (OverCapacityException ex) {
buf.release();
- StorageOperationStats.getInstance().appendWALFullStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().appendWALFullStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
TraceUtils.endSpan(scope, ex);
throw ex;
}
@@ -407,8 +406,8 @@ private AppendResult append0(ByteBuf body, int crc) throws OverCapacityException
slidingWindowService.tryWriteBlock();
final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture);
- appendResult.future().whenComplete((nil, ex) -> StorageOperationStats.getInstance().appendWALCompleteStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
- StorageOperationStats.getInstance().appendWALBeforeStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
+ appendResult.future().whenComplete((nil, ex) -> StorageOperationStats.getInstance().appendWALCompleteStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
+ StorageOperationStats.getInstance().appendWALBeforeStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return appendResult;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java
index 64964e294..ceb15991c 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java
@@ -12,7 +12,6 @@
package com.automq.stream.s3.wal;
import com.automq.stream.s3.ByteBufAlloc;
-import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.wal.util.WALChannel;
@@ -354,7 +353,7 @@ private void writeBlockData(BlockBatch blocks) {
walChannel.retryWrite(block.data(), position);
}
walChannel.retryFlush();
- StorageOperationStats.getInstance().appendWALWriteStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().appendWALWriteStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS));
}
private void makeWriteOffsetMatchWindow(long newWindowEndOffset) {
@@ -533,7 +532,7 @@ public WriteBlockProcessor(BlockBatch blocks) {
@Override
public void run() {
- StorageOperationStats.getInstance().appendWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().appendWALAwaitStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS));
try {
writeBlock(this.blocks);
} catch (Exception e) {
@@ -564,7 +563,7 @@ public String toString() {
return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}';
}
});
- StorageOperationStats.getInstance().appendWALAfterStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
+ StorageOperationStats.getInstance().appendWALAfterStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS));
}
}
}
diff --git a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java
index 7512a0a7d..fbea91eaf 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java
@@ -19,9 +19,9 @@
import com.automq.stream.s3.metrics.MetricsConfig;
import com.automq.stream.s3.metrics.MetricsLevel;
+import com.yammer.metrics.core.MetricName;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
-import io.opentelemetry.api.metrics.LongHistogram;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -38,13 +38,13 @@ public void testConfigurableMetrics() {
Assertions.assertEquals(MetricsLevel.DEBUG, metric.metricsLevel);
Assertions.assertEquals(Attributes.builder().put("extra", "v").put("base", "v2").build(), metric.attributes);
- HistogramMetric histogramMetric = new HistogramMetric(new MetricsConfig(), Attributes.builder().put("extra", "v").build(),
- Mockito.mock(LongHistogram.class));
- Assertions.assertEquals(MetricsLevel.INFO, histogramMetric.metricsLevel);
+ YammerHistogramMetric yammerHistogramMetric = new YammerHistogramMetric(Mockito.mock(MetricName.class), MetricsLevel.INFO, new MetricsConfig(),
+ Attributes.builder().put("extra", "v").build());
+ Assertions.assertEquals(MetricsLevel.INFO, yammerHistogramMetric.metricsLevel);
- histogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, Attributes.builder().put("base", "v2").build()));
- Assertions.assertEquals(MetricsLevel.DEBUG, histogramMetric.metricsLevel);
- Assertions.assertEquals(Attributes.builder().put("extra", "v").put("base", "v2").build(), histogramMetric.attributes);
+ yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, Attributes.builder().put("base", "v2").build()));
+ Assertions.assertEquals(MetricsLevel.DEBUG, yammerHistogramMetric.metricsLevel);
+ Assertions.assertEquals(Attributes.builder().put("extra", "v").put("base", "v2").build(), yammerHistogramMetric.attributes);
}
@Test
@@ -56,11 +56,15 @@ public void testMetricsLevel() {
Assertions.assertTrue(metric.add(MetricsLevel.INFO, 1));
Assertions.assertTrue(metric.add(MetricsLevel.DEBUG, 1));
- HistogramMetric histogramMetric = new HistogramMetric(new MetricsConfig(MetricsLevel.INFO, null), Mockito.mock(LongHistogram.class));
- Assertions.assertTrue(histogramMetric.record(MetricsLevel.INFO, 1));
- Assertions.assertFalse(histogramMetric.record(MetricsLevel.DEBUG, 1));
- histogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, null));
- Assertions.assertTrue(histogramMetric.record(MetricsLevel.INFO, 1));
- Assertions.assertTrue(histogramMetric.record(MetricsLevel.DEBUG, 1));
+ YammerHistogramMetric yammerHistogramMetric = new YammerHistogramMetric(Mockito.mock(MetricName.class), MetricsLevel.INFO, new MetricsConfig(),
+ Attributes.builder().put("extra", "v").build());
+ Assertions.assertTrue(yammerHistogramMetric.shouldRecord());
+ yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, null));
+ Assertions.assertTrue(yammerHistogramMetric.shouldRecord());
+ yammerHistogramMetric = new YammerHistogramMetric(Mockito.mock(MetricName.class), MetricsLevel.DEBUG, new MetricsConfig(),
+ Attributes.builder().put("extra", "v").build());
+ Assertions.assertFalse(yammerHistogramMetric.shouldRecord());
+ yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, null));
+ Assertions.assertTrue(yammerHistogramMetric.shouldRecord());
}
}