From adb4887f40975a297904350006cccedf1fb7d73f Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Tue, 5 Mar 2024 20:56:07 +0800 Subject: [PATCH 1/4] feat(s3stream): reduce histogram sample size by dividing into explicit percentiles Signed-off-by: Shichao Nie --- s3stream/pom.xml | 5 + .../java/com/automq/stream/s3/S3Storage.java | 20 +-- .../java/com/automq/stream/s3/S3Stream.java | 10 +- .../com/automq/stream/s3/S3StreamClient.java | 4 +- .../stream/s3/cache/DefaultS3BlockCache.java | 2 +- .../com/automq/stream/s3/cache/LogCache.java | 4 +- .../stream/s3/cache/ReadAheadAgent.java | 2 +- .../automq/stream/s3/cache/StreamReader.java | 8 +- .../s3/metrics/S3StreamMetricsManager.java | 147 ++++++++---------- .../s3/metrics/operations/S3ObjectStage.java | 4 + .../stream/s3/metrics/operations/S3Stage.java | 4 + .../stream/s3/metrics/stats/NetworkStats.java | 12 +- .../s3/metrics/stats/S3ObjectStats.java | 17 +- .../s3/metrics/stats/S3OperationStats.java | 90 +++++++---- .../metrics/stats/StorageOperationStats.java | 94 +++++++---- .../metrics/stats/StreamOperationStats.java | 29 ++-- .../metrics/wrapper/HistogramInstrument.java | 102 ++++++++++++ .../s3/metrics/wrapper/HistogramMetric.java | 38 ----- .../wrapper/YammerHistogramMetric.java | 66 ++++++++ .../stream/s3/operator/DefaultS3Operator.java | 37 +++-- .../stream/s3/operator/MultiPartWriter.java | 8 +- .../stream/s3/operator/ProxyWriter.java | 6 +- .../com/automq/stream/s3/wal/BlockImpl.java | 2 +- .../automq/stream/s3/wal/BlockWALService.java | 6 +- .../stream/s3/wal/SlidingWindowService.java | 6 +- .../metrics/wrapper/MetricsWrapperTest.java | 29 ++-- 26 files changed, 480 insertions(+), 272 deletions(-) create mode 100644 s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java delete mode 100644 s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java create mode 100644 s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/YammerHistogramMetric.java diff --git a/s3stream/pom.xml b/s3stream/pom.xml index f28f908c5..27ed9c9e2 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -133,6 +133,11 @@ jnr-posix 3.1.19 + + com.yammer.metrics + metrics-core + 2.2.0 + diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 8c1dc718a..f3f279b61 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -334,7 +334,7 @@ public CompletableFuture append(AppendContext context, StreamRecordBatch s append0(context, writeRequest, false); cf.whenComplete((nil, ex) -> { streamRecord.release(); - StorageOperationStats.getInstance().appendStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().appendStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); }); return cf; } @@ -354,7 +354,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f if (!fromBackoff) { backoffRecords.offer(request); } - StorageOperationStats.getInstance().appendLogCacheFullStats.record(MetricsLevel.INFO, 0L); + StorageOperationStats.getInstance().appendLogCacheFullStats.record(0L); if (System.currentTimeMillis() - lastLogTimestamp > 1000L) { LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize); lastLogTimestamp = System.currentTimeMillis(); @@ -437,7 +437,7 @@ public CompletableFuture read(FetchContext context, TimerUtil timerUtil = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>(); FutureUtil.propagate(read0(context, streamId, startOffset, endOffset, maxBytes), cf); - cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().readStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS))); + cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().readStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS))); return cf; } @@ -515,7 +515,7 @@ public CompletableFuture forceUpload(long streamId) { CompletableFuture cf = new CompletableFuture<>(); // Wait for a while to group force upload tasks. forceUploadTicker.tick().whenComplete((nil, ex) -> { - StorageOperationStats.getInstance().forceUploadWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().forceUploadWALAwaitStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS)); uploadDeltaWAL(streamId, true); // Wait for all tasks contains streamId complete. FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.stream() @@ -525,7 +525,7 @@ public CompletableFuture forceUpload(long streamId) { callbackSequencer.tryFree(streamId); } }); - cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().forceUploadWALCompleteStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS))); + cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().forceUploadWALCompleteStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS))); return cf; } @@ -557,7 +557,7 @@ private void handleAppendCallback0(WalWriteRequest request) { for (WalWriteRequest waitingAckRequest : waitingAckRequests) { waitingAckRequest.cf.complete(null); } - StorageOperationStats.getInstance().appendCallbackStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().appendCallbackStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS)); } private Lock getStreamCallbackLock(long streamId) { @@ -602,7 +602,7 @@ CompletableFuture uploadDeltaWAL(DeltaWALUploadTaskContext context) { inflightWALUploadTasks.add(context); backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL")); cf.whenComplete((nil, ex) -> { - StorageOperationStats.getInstance().uploadWALCompleteStats.record(MetricsLevel.INFO, context.timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().uploadWALCompleteStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS)); inflightWALUploadTasks.remove(context); if (ex != null) { LOGGER.error("upload delta WAL fail", ex); @@ -643,11 +643,11 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) { private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) { context.task.prepare().thenAcceptAsync(nil -> { - StorageOperationStats.getInstance().uploadWALPrepareStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().uploadWALPrepareStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS)); // 1. poll out current task and trigger upload. DeltaWALUploadTaskContext peek = walPrepareQueue.poll(); Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> StorageOperationStats.getInstance() - .uploadWALUploadStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS))); + .uploadWALUploadStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS))); // 2. add task to commit queue. boolean walObjectCommitQueueEmpty = walCommitQueue.isEmpty(); walCommitQueue.add(peek); @@ -664,7 +664,7 @@ private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) { private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) { context.task.commit().thenAcceptAsync(nil -> { - StorageOperationStats.getInstance().uploadWALCommitStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().uploadWALCommitStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS)); // 1. poll out current task walCommitQueue.poll(); if (context.cache.confirmOffset() != 0) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 0dcf92658..e4c94976f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -144,7 +144,7 @@ public CompletableFuture append(AppendContext context, RecordBatch }, LOGGER, "append"); pendingAppends.add(cf); cf.whenComplete((nil, ex) -> { - StreamOperationStats.getInstance().appendStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + StreamOperationStats.getInstance().appendStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); pendingAppends.remove(cf); }); return cf; @@ -190,7 +190,7 @@ public CompletableFuture fetch(FetchContext context, CompletableFuture cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch"); pendingFetches.add(cf); cf.whenComplete((rs, ex) -> { - StreamOperationStats.getInstance().fetchStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + StreamOperationStats.getInstance().fetchStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (ex != null) { Throwable cause = FutureUtil.cause(ex); if (!(cause instanceof FastReadFailFastException)) { @@ -256,7 +256,7 @@ public CompletableFuture trim(long newStartOffset) { CompletableFuture cf = new CompletableFuture<>(); lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf)); this.lastPendingTrim = cf; - cf.whenComplete((nil, ex) -> StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS))); + cf.whenComplete((nil, ex) -> StreamOperationStats.getInstance().trimStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS))); return cf; }, LOGGER, "trim"); } finally { @@ -307,10 +307,10 @@ public CompletableFuture close() { closeCf.whenComplete((nil, ex) -> { if (ex != null) { LOGGER.error("{} close fail", logIdent, ex); - StreamOperationStats.getInstance().closeStreamStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + StreamOperationStats.getInstance().closeStreamStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); } else { LOGGER.info("{} closed", logIdent); - StreamOperationStats.getInstance().closeStreamStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + StreamOperationStats.getInstance().closeStreamStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); } }); diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index 78499d4b6..2cffa16d4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -83,7 +83,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage public CompletableFuture createAndOpenStream(CreateStreamOptions options) { TimerUtil timerUtil = new TimerUtil(); return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> { - StreamOperationStats.getInstance().createStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + StreamOperationStats.getInstance().createStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); return openStream0(streamId, options.epoch()); }), LOGGER, "createAndOpenStream"); } @@ -117,7 +117,7 @@ private CompletableFuture openStream0(long streamId, long epoch) { metadata.startOffset(), metadata.endOffset(), storage, streamManager, networkInboundBucket, networkOutboundBucket)); openedStreams.put(streamId, stream); - StreamOperationStats.getInstance().openStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + StreamOperationStats.getInstance().openStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); return stream; }); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java index 2e83e14e3..3c89be111 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java @@ -105,7 +105,7 @@ public CompletableFuture read(TraceContext traceContext, long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS); boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT; - StorageOperationStats.getInstance().readBlockCacheStats(isCacheHit).record(MetricsLevel.INFO, timeElapsed); + StorageOperationStats.getInstance().readBlockCacheStats(isCacheHit).record(timeElapsed); Span.fromContext(finalTraceContext.currentContext()).setAttribute("cache_hit", isCacheHit); if (LOGGER.isDebugEnabled()) { LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {}", diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 8e708d25a..0c514a163 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -94,7 +94,7 @@ public boolean put(StreamRecordBatch recordBatch) { } finally { readLock.unlock(); } - StorageOperationStats.getInstance().appendLogCacheStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().appendLogCacheStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); return full; } @@ -145,7 +145,7 @@ public List get(TraceContext context, long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS); boolean isCacheHit = !records.isEmpty() && records.get(0).getBaseOffset() <= startOffset; - StorageOperationStats.getInstance().readLogCacheStats(isCacheHit).record(MetricsLevel.INFO, timeElapsed); + StorageOperationStats.getInstance().readLogCacheStats(isCacheHit).record(timeElapsed); return records; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java index 4160e04f6..104650f08 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java @@ -93,7 +93,7 @@ public void updateReadAheadResult(long readAheadEndOffset, int readAheadSize) { lock.lock(); this.readAheadEndOffset = readAheadEndOffset; this.lastReadAheadSize = readAheadSize; - StorageOperationStats.getInstance().readAheadSizeStats.record(MetricsLevel.INFO, readAheadSize); + StorageOperationStats.getInstance().readAheadSizeStats.record(readAheadSize); if (logger.isDebugEnabled()) { logger.debug("update read ahead offset {}, size: {}, lastReadOffset: {}", readAheadEndOffset, readAheadSize, lastReadOffset); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java index 3cea50a19..3bd068fd2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java @@ -131,7 +131,7 @@ public CompletableFuture> syncReadAhead(TraceContext tra completeInflightTask0(key, ex); } context.taskKeySet.clear(); - StorageOperationStats.getInstance().blockCacheReadAheadStats(true).record(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().blockCacheReadAheadStats(true).record(timer.elapsedAs(TimeUnit.NANOSECONDS)); }); } @@ -168,7 +168,7 @@ CompletableFuture> handleSyncReadAhead(TraceContext trac CompletableFuture throttleCf = inflightReadThrottle.acquire(traceContext, uuid, totalReserveSize); return throttleCf.thenComposeAsync(nil -> { // concurrently read all data blocks - StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(MetricsLevel.INFO, throttleTimer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(throttleTimer.elapsedAs(TimeUnit.NANOSECONDS)); for (int i = 0; i < streamDataBlocksToRead.size(); i++) { Pair pair = streamDataBlocksToRead.get(i); ObjectReader objectReader = pair.getLeft(); @@ -296,7 +296,7 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int completeInflightTask0(key, ex); } context.taskKeySet.clear(); - StorageOperationStats.getInstance().blockCacheReadAheadStats(false).record(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().blockCacheReadAheadStats(false).record(timer.elapsedAs(TimeUnit.NANOSECONDS)); }); } @@ -364,7 +364,7 @@ CompletableFuture handleAsyncReadAhead(long streamId, long startOffset, lo if (reserveResult.reserveSize() > 0) { TimerUtil throttleTimer = new TimerUtil(); inflightReadThrottle.acquire(TraceContext.DEFAULT, uuid, reserveResult.reserveSize()).thenAcceptAsync(nil -> { - StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(MetricsLevel.INFO, throttleTimer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(throttleTimer.elapsedAs(TimeUnit.NANOSECONDS)); // read data block if (context.taskKeySet.contains(taskKey)) { setInflightReadAheadStatus(taskKey, DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 5da77fbcf..d4e71d552 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -17,11 +17,13 @@ import com.automq.stream.s3.metrics.operations.S3Stage; import com.automq.stream.s3.metrics.wrapper.ConfigListener; import com.automq.stream.s3.metrics.wrapper.CounterMetric; -import com.automq.stream.s3.metrics.wrapper.HistogramMetric; +import com.automq.stream.s3.metrics.wrapper.HistogramInstrument; +import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongGauge; import java.util.ArrayList; @@ -31,23 +33,24 @@ public class S3StreamMetricsManager { private static final List BASE_ATTRIBUTES_LISTENERS = new ArrayList<>(); + public static final MetricsRegistry METRICS_REGISTRY = new MetricsRegistry(); private static LongCounter s3DownloadSizeInTotal = new NoopLongCounter(); private static LongCounter s3UploadSizeInTotal = new NoopLongCounter(); - private static LongHistogram operationLatency = new NoopLongHistogram(); + private static HistogramInstrument operationLatency; private static LongCounter objectNumInTotal = new NoopLongCounter(); - private static LongHistogram objectStageCost = new NoopLongHistogram(); - private static LongHistogram objectUploadSize = new NoopLongHistogram(); - private static LongHistogram objectDownloadSize = new NoopLongHistogram(); + private static HistogramInstrument objectStageCost; + private static HistogramInstrument objectUploadSize; + private static HistogramInstrument objectDownloadSize; private static LongCounter networkInboundUsageInTotal = new NoopLongCounter(); private static LongCounter networkOutboundUsageInTotal = new NoopLongCounter(); private static ObservableLongGauge networkInboundAvailableBandwidth = new NoopObservableLongGauge(); private static ObservableLongGauge networkOutboundAvailableBandwidth = new NoopObservableLongGauge(); private static ObservableLongGauge networkInboundLimiterQueueSize = new NoopObservableLongGauge(); private static ObservableLongGauge networkOutboundLimiterQueueSize = new NoopObservableLongGauge(); - private static LongHistogram networkInboundLimiterQueueTime = new NoopLongHistogram(); - private static LongHistogram networkOutboundLimiterQueueTime = new NoopLongHistogram(); - private static LongHistogram readAheadSize = new NoopLongHistogram(); - private static LongHistogram readAheadLimierQueueTime = new NoopLongHistogram(); + private static HistogramInstrument networkInboundLimiterQueueTime; + private static HistogramInstrument networkOutboundLimiterQueueTime; + private static HistogramInstrument readAheadSize; + private static HistogramInstrument readAheadLimierQueueTime; private static ObservableLongGauge deltaWalStartOffset = new NoopObservableLongGauge(); private static ObservableLongGauge deltaWalTrimmedOffset = new NoopObservableLongGauge(); private static ObservableLongGauge deltaWalCacheSize = new NoopObservableLongGauge(); @@ -102,31 +105,15 @@ public static void initMetrics(Meter meter, String prefix) { .setDescription("S3 upload size") .setUnit("bytes") .build(); - operationLatency = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OPERATION_LATENCY_METRIC_NAME) - .setDescription("Operations latency") - .setUnit("nanoseconds") - .ofLongs() - .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES) - .build(); + operationLatency = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OPERATION_LATENCY_METRIC_NAME, + "Operation latency", "nanoseconds"); objectNumInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.OBJECT_COUNT_METRIC_NAME) .setDescription("Objects count") .build(); - objectStageCost = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OBJECT_STAGE_COST_METRIC_NAME) - .setDescription("Objects stage cost") - .setUnit("nanoseconds") - .ofLongs() - .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES) - .build(); - objectUploadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OBJECT_UPLOAD_SIZE_METRIC_NAME) - .setDescription("Objects upload size") - .setUnit("bytes") - .ofLongs() - .build(); - objectDownloadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.OBJECT_DOWNLOAD_SIZE_METRIC_NAME) - .setDescription("Objects download size") - .setUnit("bytes") - .ofLongs() - .build(); + objectStageCost = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OBJECT_STAGE_COST_METRIC_NAME, + "Objects stage cost", "nanoseconds"); + objectUploadSize = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OBJECT_UPLOAD_SIZE_METRIC_NAME, + "Objects upload size", "bytes"); networkInboundUsageInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_USAGE_METRIC_NAME) .setDescription("Network inbound usage") .setUnit("bytes") @@ -169,29 +156,14 @@ public static void initMetrics(Meter meter, String prefix) { result.record((long) networkOutboundLimiterQueueSizeSupplier.get(), metricsConfig.getBaseAttributes()); } }); - networkInboundLimiterQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME) - .setDescription("Network inbound limiter queue time") - .setUnit("nanoseconds") - .ofLongs() - .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES) - .build(); - networkOutboundLimiterQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME) - .setDescription("Network outbound limiter queue time") - .setUnit("nanoseconds") - .ofLongs() - .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES) - .build(); - readAheadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.READ_AHEAD_SIZE_METRIC_NAME) - .setDescription("Read ahead size") - .setUnit("bytes") - .ofLongs() - .build(); - readAheadLimierQueueTime = meter.histogramBuilder(prefix + S3StreamMetricsConstant.READ_AHEAD_QUEUE_TIME_METRIC_NAME) - .setDescription("Read ahead limiter queue time") - .setUnit("nanoseconds") - .ofLongs() - .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES) - .build(); + networkInboundLimiterQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME, + "Network inbound limiter queue time", "nanoseconds"); + networkOutboundLimiterQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME, + "Network outbound limiter queue time", "nanoseconds"); + readAheadSize = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.READ_AHEAD_SIZE_METRIC_NAME, + "Read ahead size", "bytes"); + readAheadLimierQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.READ_AHEAD_QUEUE_TIME_METRIC_NAME, + "Read ahead limiter queue time", "nanoseconds"); deltaWalStartOffset = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.WAL_START_OFFSET) .setDescription("Delta WAL start offset") .ofLongs() @@ -353,35 +325,43 @@ public static CounterMetric buildS3DownloadSizeMetric() { } } - public static HistogramMetric buildStageOperationMetric(S3Stage stage) { + public static YammerHistogramMetric buildStageOperationMetric(MetricName metricName, MetricsLevel metricsLevel, S3Stage stage) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(stage), operationLatency); + YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, + metricsConfig, AttributesUtils.buildAttributes(stage)); BASE_ATTRIBUTES_LISTENERS.add(metric); + operationLatency.registerYammerHistogramMetric(metric); return metric; } } - public static HistogramMetric buildOperationMetric(S3Operation operation) { + public static YammerHistogramMetric buildOperationMetric(MetricName metricName, MetricsLevel metricsLevel, S3Operation operation) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(operation), operationLatency); + YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, + metricsConfig, AttributesUtils.buildAttributes(operation)); BASE_ATTRIBUTES_LISTENERS.add(metric); + operationLatency.registerYammerHistogramMetric(metric); return metric; } } - public static HistogramMetric buildOperationMetric(S3Operation operation, String status, String sizeLabelName) { + public static YammerHistogramMetric buildOperationMetric(MetricName metricName, MetricsLevel metricsLevel, + S3Operation operation, String status, String sizeLabelName) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(operation, - status, sizeLabelName), operationLatency); + YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, + AttributesUtils.buildAttributes(operation, status, sizeLabelName)); BASE_ATTRIBUTES_LISTENERS.add(metric); + operationLatency.registerYammerHistogramMetric(metric); return metric; } } - public static HistogramMetric buildOperationMetric(S3Operation operation, String status) { + public static YammerHistogramMetric buildOperationMetric(MetricName metricName, MetricsLevel metricsLevel, S3Operation operation, String status) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(operation, status), operationLatency); + YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, + AttributesUtils.buildAttributes(operation, status)); BASE_ATTRIBUTES_LISTENERS.add(metric); + operationLatency.registerYammerHistogramMetric(metric); return metric; } } @@ -394,26 +374,21 @@ public static CounterMetric buildObjectNumMetric() { } } - public static HistogramMetric buildObjectStageCostMetric(S3ObjectStage stage) { - synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(stage), objectStageCost); - BASE_ATTRIBUTES_LISTENERS.add(metric); - return metric; - } - } - - public static HistogramMetric buildObjectUploadSizeMetric() { + public static YammerHistogramMetric buildObjectStageCostMetric(MetricName metricName, MetricsLevel metricsLevel, S3ObjectStage stage) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, objectUploadSize); + YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, + AttributesUtils.buildAttributes(stage)); BASE_ATTRIBUTES_LISTENERS.add(metric); + objectStageCost.registerYammerHistogramMetric(metric); return metric; } } - public static HistogramMetric buildObjectDownloadSizeMetric() { + public static YammerHistogramMetric buildObjectUploadSizeMetric(MetricName metricName, MetricsLevel metricsLevel) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, objectDownloadSize); + YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); + objectStageCost.registerYammerHistogramMetric(metric); return metric; } } @@ -434,35 +409,39 @@ public static CounterMetric buildNetworkOutboundUsageMetric() { } } - public static HistogramMetric buildNetworkInboundLimiterQueueTimeMetric() { + public static YammerHistogramMetric buildNetworkInboundLimiterQueueTimeMetric(MetricName metricName, MetricsLevel metricsLevel) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, networkInboundLimiterQueueTime); + YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); + networkInboundLimiterQueueTime.registerYammerHistogramMetric(metric); return metric; } } - public static HistogramMetric buildNetworkOutboundLimiterQueueTimeMetric() { + public static YammerHistogramMetric buildNetworkOutboundLimiterQueueTimeMetric(MetricName metricName, MetricsLevel metricsLevel) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, networkOutboundLimiterQueueTime); + YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); + networkOutboundLimiterQueueTime.registerYammerHistogramMetric(metric); return metric; } } - public static HistogramMetric buildReadAheadSizeMetric() { + public static YammerHistogramMetric buildReadAheadSizeMetric(MetricName metricName, MetricsLevel metricsLevel) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, readAheadSize); + YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); + readAheadSize.registerYammerHistogramMetric(metric); return metric; } } - public static HistogramMetric buildReadAheadLimiterQueueTimeMetric() { + public static YammerHistogramMetric buildReadAheadLimiterQueueTimeMetric(MetricName metricName, MetricsLevel metricsLevel) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, readAheadLimierQueueTime); + YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); + readAheadLimierQueueTime.registerYammerHistogramMetric(metric); return metric; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java index 5601cf49e..22270feb0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3ObjectStage.java @@ -28,4 +28,8 @@ public enum S3ObjectStage { public String getName() { return name; } + + public String getUniqueKey() { + return "s3_object_stage_" + name; + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java index 42d50ed44..34aab3977 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java @@ -50,6 +50,10 @@ public String getName() { return name; } + public String getUniqueKey() { + return operation.getUniqueKey() + "-" + name; + } + @Override public String toString() { return "S3Stage{" + diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java index 53e8ee4e8..ef2d07313 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java @@ -11,18 +11,22 @@ package com.automq.stream.s3.metrics.stats; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.wrapper.CounterMetric; -import com.automq.stream.s3.metrics.wrapper.HistogramMetric; +import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; +import com.yammer.metrics.core.MetricName; public class NetworkStats { private volatile static NetworkStats instance = null; private final CounterMetric networkInboundUsageStats = S3StreamMetricsManager.buildNetworkInboundUsageMetric(); private final CounterMetric networkOutboundUsageStats = S3StreamMetricsManager.buildNetworkOutboundUsageMetric(); - private final HistogramMetric networkInboundLimiterQueueTimeStats = S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(); - private final HistogramMetric networkOutboundLimiterQueueTimeStats = S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(); + private final YammerHistogramMetric networkInboundLimiterQueueTimeStats = S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric( + new MetricName(NetworkStats.class, "NetworkInboundLimiterQueueTime"), MetricsLevel.INFO); + private final YammerHistogramMetric networkOutboundLimiterQueueTimeStats = S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric( + new MetricName(NetworkStats.class, "NetworkOutboundLimiterQueueTime"), MetricsLevel.INFO); private NetworkStats() { } @@ -42,7 +46,7 @@ public CounterMetric networkUsageStats(AsyncNetworkBandwidthLimiter.Type type) { return type == AsyncNetworkBandwidthLimiter.Type.INBOUND ? networkInboundUsageStats : networkOutboundUsageStats; } - public HistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type) { + public YammerHistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type) { return type == AsyncNetworkBandwidthLimiter.Type.INBOUND ? networkInboundLimiterQueueTimeStats : networkOutboundLimiterQueueTimeStats; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectStats.java index 9ae79e34f..4187a9ccf 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3ObjectStats.java @@ -11,20 +11,25 @@ package com.automq.stream.s3.metrics.stats; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.operations.S3ObjectStage; import com.automq.stream.s3.metrics.wrapper.CounterMetric; -import com.automq.stream.s3.metrics.wrapper.HistogramMetric; +import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric; +import com.yammer.metrics.core.MetricName; public class S3ObjectStats { private volatile static S3ObjectStats instance = null; public final CounterMetric objectNumInTotalStats = S3StreamMetricsManager.buildObjectNumMetric(); - public final HistogramMetric objectStageUploadPartStats = S3StreamMetricsManager.buildObjectStageCostMetric(S3ObjectStage.UPLOAD_PART); - public final HistogramMetric objectStageReadyCloseStats = S3StreamMetricsManager.buildObjectStageCostMetric(S3ObjectStage.READY_CLOSE); - public final HistogramMetric objectStageTotalStats = S3StreamMetricsManager.buildObjectStageCostMetric(S3ObjectStage.TOTAL); - public final HistogramMetric objectUploadSizeStats = S3StreamMetricsManager.buildObjectUploadSizeMetric(); - public final HistogramMetric objectDownloadSizeStats = S3StreamMetricsManager.buildObjectDownloadSizeMetric(); + public final YammerHistogramMetric objectStageUploadPartStats = S3StreamMetricsManager.buildObjectStageCostMetric( + new MetricName(S3ObjectStats.class, S3ObjectStage.UPLOAD_PART.getUniqueKey()), MetricsLevel.DEBUG, S3ObjectStage.UPLOAD_PART); + public final YammerHistogramMetric objectStageReadyCloseStats = S3StreamMetricsManager.buildObjectStageCostMetric( + new MetricName(S3ObjectStats.class, S3ObjectStage.READY_CLOSE.getUniqueKey()), MetricsLevel.DEBUG, S3ObjectStage.READY_CLOSE); + public final YammerHistogramMetric objectStageTotalStats = S3StreamMetricsManager.buildObjectStageCostMetric( + new MetricName(S3ObjectStats.class, S3ObjectStage.TOTAL.getUniqueKey()), MetricsLevel.DEBUG, S3ObjectStage.TOTAL); + public final YammerHistogramMetric objectUploadSizeStats = S3StreamMetricsManager.buildObjectUploadSizeMetric( + new MetricName(S3ObjectStats.class, "ObjectUploadSize"), MetricsLevel.DEBUG); private S3ObjectStats() { } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java index 9e8030f22..e10fec3bc 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java @@ -12,11 +12,13 @@ package com.automq.stream.s3.metrics.stats; import com.automq.stream.s3.metrics.AttributesUtils; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsConstant; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.operations.S3Operation; import com.automq.stream.s3.metrics.wrapper.CounterMetric; -import com.automq.stream.s3.metrics.wrapper.HistogramMetric; +import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric; +import com.yammer.metrics.core.MetricName; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -24,22 +26,42 @@ public class S3OperationStats { private volatile static S3OperationStats instance = null; public final CounterMetric uploadSizeTotalStats = S3StreamMetricsManager.buildS3UploadSizeMetric(); public final CounterMetric downloadSizeTotalStats = S3StreamMetricsManager.buildS3DownloadSizeMetric(); - private final Map getObjectSuccessStats = new ConcurrentHashMap<>(); - private final Map getObjectFailedStats = new ConcurrentHashMap<>(); - private final Map putObjectSuccessStats = new ConcurrentHashMap<>(); - private final Map putObjectFailedStats = new ConcurrentHashMap<>(); - private final HistogramMetric deleteObjectSuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.DELETE_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); - private final HistogramMetric deleteObjectFailedStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.DELETE_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED); - private final HistogramMetric deleteObjectsSuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.DELETE_OBJECTS, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); - private final HistogramMetric deleteObjectsFailedStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.DELETE_OBJECTS, S3StreamMetricsConstant.LABEL_STATUS_FAILED); - private final HistogramMetric createMultiPartUploadSuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.CREATE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); - private final HistogramMetric createMultiPartUploadFailedStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.CREATE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_FAILED); - private final Map uploadPartSuccessStats = new ConcurrentHashMap<>(); - private final Map uploadPartFailedStats = new ConcurrentHashMap<>(); - private final HistogramMetric uploadPartCopySuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.UPLOAD_PART_COPY, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); - private final HistogramMetric uploadPartCopyFailedStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.UPLOAD_PART_COPY, S3StreamMetricsConstant.LABEL_STATUS_FAILED); - private final HistogramMetric completeMultiPartUploadSuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.COMPLETE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); - private final HistogramMetric completeMultiPartUploadFailedStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.COMPLETE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_FAILED); + private final Map getObjectSuccessStats = new ConcurrentHashMap<>(); + private final Map getObjectFailedStats = new ConcurrentHashMap<>(); + private final Map putObjectSuccessStats = new ConcurrentHashMap<>(); + private final Map putObjectFailedStats = new ConcurrentHashMap<>(); + private final YammerHistogramMetric deleteObjectSuccessStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(S3OperationStats.class, S3Operation.DELETE_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + MetricsLevel.INFO, S3Operation.DELETE_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); + private final YammerHistogramMetric deleteObjectFailedStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(S3OperationStats.class, S3Operation.DELETE_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + MetricsLevel.INFO, S3Operation.DELETE_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED); + private final YammerHistogramMetric deleteObjectsSuccessStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(S3OperationStats.class, S3Operation.DELETE_OBJECTS.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + MetricsLevel.INFO, S3Operation.DELETE_OBJECTS, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); + private final YammerHistogramMetric deleteObjectsFailedStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(S3OperationStats.class, S3Operation.DELETE_OBJECTS.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + MetricsLevel.INFO, S3Operation.DELETE_OBJECTS, S3StreamMetricsConstant.LABEL_STATUS_FAILED); + private final YammerHistogramMetric createMultiPartUploadSuccessStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(S3OperationStats.class, S3Operation.CREATE_MULTI_PART_UPLOAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + MetricsLevel.INFO, S3Operation.CREATE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); + private final YammerHistogramMetric createMultiPartUploadFailedStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(S3OperationStats.class, S3Operation.CREATE_MULTI_PART_UPLOAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + MetricsLevel.INFO, S3Operation.CREATE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_FAILED); + private final Map uploadPartSuccessStats = new ConcurrentHashMap<>(); + private final Map uploadPartFailedStats = new ConcurrentHashMap<>(); + private final YammerHistogramMetric uploadPartCopySuccessStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART_COPY.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + MetricsLevel.INFO, S3Operation.UPLOAD_PART_COPY, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); + private final YammerHistogramMetric uploadPartCopyFailedStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART_COPY.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + MetricsLevel.INFO, S3Operation.UPLOAD_PART_COPY, S3StreamMetricsConstant.LABEL_STATUS_FAILED); + private final YammerHistogramMetric completeMultiPartUploadSuccessStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(S3OperationStats.class, S3Operation.COMPLETE_MULTI_PART_UPLOAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + MetricsLevel.INFO, S3Operation.COMPLETE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); + private final YammerHistogramMetric completeMultiPartUploadFailedStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(S3OperationStats.class, S3Operation.COMPLETE_MULTI_PART_UPLOAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + MetricsLevel.INFO, S3Operation.COMPLETE_MULTI_PART_UPLOAD, S3StreamMetricsConstant.LABEL_STATUS_FAILED); private S3OperationStats() { } @@ -55,56 +77,62 @@ public static S3OperationStats getInstance() { return instance; } - public HistogramMetric getObjectStats(long size, boolean isSuccess) { + public YammerHistogramMetric getObjectStats(long size, boolean isSuccess) { String label = AttributesUtils.getObjectBucketLabel(size); if (isSuccess) { return getObjectSuccessStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - S3Operation.GET_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label)); + new MetricName(S3OperationStats.class, S3Operation.GET_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + MetricsLevel.INFO, S3Operation.GET_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label)); } else { return getObjectFailedStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - S3Operation.GET_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label)); + new MetricName(S3OperationStats.class, S3Operation.GET_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + MetricsLevel.INFO, S3Operation.GET_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label)); } } - public HistogramMetric putObjectStats(long size, boolean isSuccess) { + public YammerHistogramMetric putObjectStats(long size, boolean isSuccess) { String label = AttributesUtils.getObjectBucketLabel(size); if (isSuccess) { return putObjectSuccessStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - S3Operation.PUT_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label)); + new MetricName(S3OperationStats.class, S3Operation.PUT_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + MetricsLevel.INFO, S3Operation.PUT_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label)); } else { return putObjectFailedStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - S3Operation.PUT_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label)); + new MetricName(S3OperationStats.class, S3Operation.PUT_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + MetricsLevel.INFO, S3Operation.PUT_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label)); } } - public HistogramMetric uploadPartStats(long size, boolean isSuccess) { + public YammerHistogramMetric uploadPartStats(long size, boolean isSuccess) { String label = AttributesUtils.getObjectBucketLabel(size); if (isSuccess) { return uploadPartSuccessStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - S3Operation.UPLOAD_PART, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label)); + new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + MetricsLevel.INFO, S3Operation.UPLOAD_PART, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label)); } else { return uploadPartFailedStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - S3Operation.UPLOAD_PART, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label)); + new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + MetricsLevel.INFO, S3Operation.UPLOAD_PART, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label)); } } - public HistogramMetric deleteObjectStats(boolean isSuccess) { + public YammerHistogramMetric deleteObjectStats(boolean isSuccess) { return isSuccess ? deleteObjectSuccessStats : deleteObjectFailedStats; } - public HistogramMetric deleteObjectsStats(boolean isSuccess) { + public YammerHistogramMetric deleteObjectsStats(boolean isSuccess) { return isSuccess ? deleteObjectsSuccessStats : deleteObjectsFailedStats; } - public HistogramMetric uploadPartCopyStats(boolean isSuccess) { + public YammerHistogramMetric uploadPartCopyStats(boolean isSuccess) { return isSuccess ? uploadPartCopySuccessStats : uploadPartCopyFailedStats; } - public HistogramMetric createMultiPartUploadStats(boolean isSuccess) { + public YammerHistogramMetric createMultiPartUploadStats(boolean isSuccess) { return isSuccess ? createMultiPartUploadSuccessStats : createMultiPartUploadFailedStats; } - public HistogramMetric completeMultiPartUploadStats(boolean isSuccess) { + public YammerHistogramMetric completeMultiPartUploadStats(boolean isSuccess) { return isSuccess ? completeMultiPartUploadSuccessStats : completeMultiPartUploadFailedStats; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java index 6c8f24959..8e46c592f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java @@ -11,41 +11,75 @@ package com.automq.stream.s3.metrics.stats; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsConstant; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.operations.S3Operation; import com.automq.stream.s3.metrics.operations.S3Stage; -import com.automq.stream.s3.metrics.wrapper.HistogramMetric; +import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric; +import com.yammer.metrics.core.MetricName; public class StorageOperationStats { private volatile static StorageOperationStats instance = null; - public final HistogramMetric appendStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STORAGE); - public final HistogramMetric appendWALBeforeStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_BEFORE); - public final HistogramMetric appendWALBlockPolledStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_BLOCK_POLLED); - public final HistogramMetric appendWALAwaitStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_AWAIT); - public final HistogramMetric appendWALWriteStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_WRITE); - public final HistogramMetric appendWALAfterStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_AFTER); - public final HistogramMetric appendWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.APPEND_WAL_COMPLETE); - public final HistogramMetric appendCallbackStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STORAGE_APPEND_CALLBACK); - public final HistogramMetric appendWALFullStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STORAGE_WAL_FULL); - public final HistogramMetric appendLogCacheStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STORAGE_LOG_CACHE); - public final HistogramMetric appendLogCacheFullStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL); - public final HistogramMetric uploadWALPrepareStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.UPLOAD_WAL_PREPARE); - public final HistogramMetric uploadWALUploadStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.UPLOAD_WAL_UPLOAD); - public final HistogramMetric uploadWALCommitStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.UPLOAD_WAL_COMMIT); - public final HistogramMetric uploadWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.UPLOAD_WAL_COMPLETE); - public final HistogramMetric forceUploadWALAwaitStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.FORCE_UPLOAD_WAL_AWAIT); - public final HistogramMetric forceUploadWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric(S3Stage.FORCE_UPLOAD_WAL_COMPLETE); - public final HistogramMetric readStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.READ_STORAGE); - private final HistogramMetric readLogCacheHitStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.READ_STORAGE_LOG_CACHE, S3StreamMetricsConstant.LABEL_STATUS_HIT); - private final HistogramMetric readLogCacheMissStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.READ_STORAGE_LOG_CACHE, S3StreamMetricsConstant.LABEL_STATUS_MISS); - private final HistogramMetric readBlockCacheHitStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.READ_STORAGE_BLOCK_CACHE, S3StreamMetricsConstant.LABEL_STATUS_HIT); - private final HistogramMetric readBlockCacheMissStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.READ_STORAGE_BLOCK_CACHE, S3StreamMetricsConstant.LABEL_STATUS_MISS); - private final HistogramMetric blockCacheReadAheadSyncStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.BLOCK_CACHE_READ_AHEAD, S3StreamMetricsConstant.LABEL_STATUS_SYNC); - private final HistogramMetric blockCacheReadAheadAsyncStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.BLOCK_CACHE_READ_AHEAD, S3StreamMetricsConstant.LABEL_STATUS_ASYNC); - public final HistogramMetric readAheadSizeStats = S3StreamMetricsManager.buildReadAheadSizeMetric(); - public final HistogramMetric readAheadLimiterQueueTimeStats = S3StreamMetricsManager.buildReadAheadLimiterQueueTimeMetric(); + public final YammerHistogramMetric appendStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.APPEND_STORAGE.getUniqueKey()), MetricsLevel.INFO, S3Operation.APPEND_STORAGE); + public final YammerHistogramMetric appendWALBeforeStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_BEFORE.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.APPEND_WAL_BEFORE); + public final YammerHistogramMetric appendWALBlockPolledStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_BLOCK_POLLED.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.APPEND_WAL_BLOCK_POLLED); + public final YammerHistogramMetric appendWALAwaitStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_AWAIT.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.APPEND_WAL_AWAIT); + public final YammerHistogramMetric appendWALWriteStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_WRITE.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.APPEND_WAL_WRITE); + public final YammerHistogramMetric appendWALAfterStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_AFTER.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.APPEND_WAL_AFTER); + public final YammerHistogramMetric appendWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.APPEND_WAL_COMPLETE.getUniqueKey()), MetricsLevel.INFO, S3Stage.APPEND_WAL_COMPLETE); + public final YammerHistogramMetric appendCallbackStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.APPEND_STORAGE_APPEND_CALLBACK.getUniqueKey()), MetricsLevel.DEBUG, S3Operation.APPEND_STORAGE_APPEND_CALLBACK); + public final YammerHistogramMetric appendWALFullStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.APPEND_STORAGE_WAL_FULL.getUniqueKey()), MetricsLevel.INFO, S3Operation.APPEND_STORAGE_WAL_FULL); + public final YammerHistogramMetric appendLogCacheStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.APPEND_STORAGE_LOG_CACHE.getUniqueKey()), MetricsLevel.INFO, S3Operation.APPEND_STORAGE_LOG_CACHE); + public final YammerHistogramMetric appendLogCacheFullStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL.getUniqueKey()), MetricsLevel.INFO, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL); + public final YammerHistogramMetric uploadWALPrepareStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.UPLOAD_WAL_PREPARE.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.UPLOAD_WAL_PREPARE); + public final YammerHistogramMetric uploadWALUploadStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.UPLOAD_WAL_UPLOAD.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.UPLOAD_WAL_UPLOAD); + public final YammerHistogramMetric uploadWALCommitStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.UPLOAD_WAL_COMMIT.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.UPLOAD_WAL_COMMIT); + public final YammerHistogramMetric uploadWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.UPLOAD_WAL_COMPLETE.getUniqueKey()), MetricsLevel.INFO, S3Stage.UPLOAD_WAL_COMPLETE); + public final YammerHistogramMetric forceUploadWALAwaitStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.FORCE_UPLOAD_WAL_AWAIT.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.FORCE_UPLOAD_WAL_AWAIT); + public final YammerHistogramMetric forceUploadWALCompleteStats = S3StreamMetricsManager.buildStageOperationMetric( + new MetricName(StorageOperationStats.class, S3Stage.FORCE_UPLOAD_WAL_COMPLETE.getUniqueKey()), MetricsLevel.DEBUG, S3Stage.FORCE_UPLOAD_WAL_COMPLETE); + public final YammerHistogramMetric readStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.READ_STORAGE.getUniqueKey()), MetricsLevel.INFO, S3Operation.READ_STORAGE); + private final YammerHistogramMetric readLogCacheHitStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.READ_STORAGE_LOG_CACHE.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_HIT), + MetricsLevel.INFO, S3Operation.READ_STORAGE_LOG_CACHE, S3StreamMetricsConstant.LABEL_STATUS_HIT); + private final YammerHistogramMetric readLogCacheMissStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.READ_STORAGE_LOG_CACHE.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_MISS), + MetricsLevel.INFO, S3Operation.READ_STORAGE_LOG_CACHE, S3StreamMetricsConstant.LABEL_STATUS_MISS); + private final YammerHistogramMetric readBlockCacheHitStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.READ_STORAGE_BLOCK_CACHE.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_HIT), + MetricsLevel.INFO, S3Operation.READ_STORAGE_BLOCK_CACHE, S3StreamMetricsConstant.LABEL_STATUS_HIT); + private final YammerHistogramMetric readBlockCacheMissStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.READ_STORAGE_BLOCK_CACHE.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_MISS), + MetricsLevel.INFO, S3Operation.READ_STORAGE_BLOCK_CACHE, S3StreamMetricsConstant.LABEL_STATUS_MISS); + private final YammerHistogramMetric blockCacheReadAheadSyncStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.BLOCK_CACHE_READ_AHEAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SYNC), + MetricsLevel.INFO, S3Operation.BLOCK_CACHE_READ_AHEAD, S3StreamMetricsConstant.LABEL_STATUS_SYNC); + private final YammerHistogramMetric blockCacheReadAheadAsyncStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StorageOperationStats.class, S3Operation.BLOCK_CACHE_READ_AHEAD.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_ASYNC), + MetricsLevel.INFO, S3Operation.BLOCK_CACHE_READ_AHEAD, S3StreamMetricsConstant.LABEL_STATUS_ASYNC); + public final YammerHistogramMetric readAheadSizeStats = S3StreamMetricsManager.buildReadAheadSizeMetric( + new MetricName(StorageOperationStats.class, "ReadAheadSize"), MetricsLevel.INFO); + public final YammerHistogramMetric readAheadLimiterQueueTimeStats = S3StreamMetricsManager.buildReadAheadLimiterQueueTimeMetric( + new MetricName(StorageOperationStats.class, "ReadAheadLimitQueueTime"), MetricsLevel.INFO); private StorageOperationStats() { } @@ -61,15 +95,15 @@ public static StorageOperationStats getInstance() { return instance; } - public HistogramMetric readLogCacheStats(boolean isCacheHit) { + public YammerHistogramMetric readLogCacheStats(boolean isCacheHit) { return isCacheHit ? readLogCacheHitStats : readLogCacheMissStats; } - public HistogramMetric readBlockCacheStats(boolean isCacheHit) { + public YammerHistogramMetric readBlockCacheStats(boolean isCacheHit) { return isCacheHit ? readBlockCacheHitStats : readBlockCacheMissStats; } - public HistogramMetric blockCacheReadAheadStats(boolean isSync) { + public YammerHistogramMetric blockCacheReadAheadStats(boolean isSync) { return isSync ? blockCacheReadAheadSyncStats : blockCacheReadAheadAsyncStats; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StreamOperationStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StreamOperationStats.java index f045e9ada..f2f168b2c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StreamOperationStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StreamOperationStats.java @@ -11,20 +11,31 @@ package com.automq.stream.s3.metrics.stats; +import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsConstant; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.operations.S3Operation; -import com.automq.stream.s3.metrics.wrapper.HistogramMetric; +import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric; +import com.yammer.metrics.core.MetricName; public class StreamOperationStats { private volatile static StreamOperationStats instance = null; - public final HistogramMetric createStreamStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.CREATE_STREAM); - public final HistogramMetric openStreamStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.OPEN_STREAM); - public final HistogramMetric appendStreamStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.APPEND_STREAM); - public final HistogramMetric fetchStreamStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.FETCH_STREAM); - public final HistogramMetric trimStreamStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.TRIM_STREAM); - private final HistogramMetric closeStreamSuccessStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.CLOSE_STREAM, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); - private final HistogramMetric closeStreamFailStats = S3StreamMetricsManager.buildOperationMetric(S3Operation.CLOSE_STREAM, S3StreamMetricsConstant.LABEL_STATUS_FAILED); + public final YammerHistogramMetric createStreamStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StreamOperationStats.class, S3Operation.CREATE_STREAM.getUniqueKey()), MetricsLevel.INFO, S3Operation.CREATE_STREAM); + public final YammerHistogramMetric openStreamStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StreamOperationStats.class, S3Operation.OPEN_STREAM.getUniqueKey()), MetricsLevel.INFO, S3Operation.OPEN_STREAM); + public final YammerHistogramMetric appendStreamStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StreamOperationStats.class, S3Operation.APPEND_STREAM.getUniqueKey()), MetricsLevel.INFO, S3Operation.APPEND_STREAM); + public final YammerHistogramMetric fetchStreamStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StreamOperationStats.class, S3Operation.FETCH_STREAM.getUniqueKey()), MetricsLevel.INFO, S3Operation.FETCH_STREAM); + public final YammerHistogramMetric trimStreamStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StreamOperationStats.class, S3Operation.TRIM_STREAM.getUniqueKey()), MetricsLevel.INFO, S3Operation.TRIM_STREAM); + private final YammerHistogramMetric closeStreamSuccessStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StreamOperationStats.class, S3Operation.CLOSE_STREAM.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + MetricsLevel.INFO, S3Operation.CLOSE_STREAM, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS); + private final YammerHistogramMetric closeStreamFailStats = S3StreamMetricsManager.buildOperationMetric( + new MetricName(StreamOperationStats.class, S3Operation.CLOSE_STREAM.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + MetricsLevel.INFO, S3Operation.CLOSE_STREAM, S3StreamMetricsConstant.LABEL_STATUS_FAILED); private StreamOperationStats() { } @@ -40,7 +51,7 @@ public static StreamOperationStats getInstance() { return instance; } - public HistogramMetric closeStreamStats(boolean isSuccess) { + public YammerHistogramMetric closeStreamStats(boolean isSuccess) { return isSuccess ? closeStreamSuccessStats : closeStreamFailStats; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java new file mode 100644 index 000000000..ef6617828 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.metrics.wrapper; + +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class HistogramInstrument { + private final List histograms; + private final ObservableLongGauge count; + private final ObservableLongGauge sum; + private final ObservableDoubleGauge histP50Value; + private final ObservableDoubleGauge histP99Value; + private final ObservableDoubleGauge histMeanValue; + private final ObservableDoubleGauge histMaxValue; + + public HistogramInstrument(Meter meter, String name, String desc, String unit) { + this.histograms = new CopyOnWriteArrayList<>(); + this.count = meter.gaugeBuilder(name) + .setDescription(desc + " (count)") + .ofLongs() + .buildWithCallback(result -> { + histograms.forEach(histogram -> { + if (histogram.shouldRecord()) { + result.record(histogram.count(), histogram.attributes); + } + }); + }); + this.sum = meter.gaugeBuilder(name) + .setDescription(desc + " (sum)") + .ofLongs() + .buildWithCallback(result -> { + histograms.forEach(histogram -> { + if (histogram.shouldRecord()) { + result.record(histogram.sum(), histogram.attributes); + } + }); + }); + this.histP50Value = meter.gaugeBuilder(name) + .setDescription(desc + " (50th percentile)") + .setUnit(unit) + .buildWithCallback(result -> { + histograms.forEach(histogram -> { + if (histogram.shouldRecord()) { + result.record(histogram.p50(), histogram.attributes); + } + }); + }); + this.histP99Value = meter.gaugeBuilder(name) + .setDescription(desc + " (99th percentile)") + .setUnit(unit) + .buildWithCallback(result -> { + histograms.forEach(histogram -> { + if (histogram.shouldRecord()) { + result.record(histogram.p99(), histogram.attributes); + } + }); + }); + this.histMeanValue = meter.gaugeBuilder(name) + .setDescription(desc + " (mean)") + .setUnit(unit) + .buildWithCallback(result -> { + histograms.forEach(histogram -> { + if (histogram.shouldRecord()) { + result.record(histogram.mean(), histogram.attributes); + } + }); + }); + this.histMaxValue = meter.gaugeBuilder(name) + .setDescription(desc + " (max)") + .setUnit(unit) + .buildWithCallback(result -> { + histograms.forEach(histogram -> { + if (histogram.shouldRecord()) { + result.record(histogram.max(), histogram.attributes); + } + }); + }); + } + + public void registerYammerHistogramMetric(YammerHistogramMetric histogram) { + histograms.add(histogram); + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java deleted file mode 100644 index b923a8839..000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2024, AutoMQ CO.,LTD. - * - * Use of this software is governed by the Business Source License - * included in the file BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -package com.automq.stream.s3.metrics.wrapper; - -import com.automq.stream.s3.metrics.MetricsConfig; -import com.automq.stream.s3.metrics.MetricsLevel; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongHistogram; - -public class HistogramMetric extends ConfigurableMetrics { - private final LongHistogram longHistogram; - - public HistogramMetric(MetricsConfig metricsConfig, LongHistogram longHistogram) { - this(metricsConfig, Attributes.empty(), longHistogram); - } - - public HistogramMetric(MetricsConfig metricsConfig, Attributes extraAttributes, LongHistogram longHistogram) { - super(metricsConfig, extraAttributes); - this.longHistogram = longHistogram; - } - - public boolean record(MetricsLevel metricsLevel, long value) { - if (metricsLevel.isWithin(this.metricsLevel)) { - longHistogram.record(value, attributes); - return true; - } - return false; - } -} diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/YammerHistogramMetric.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/YammerHistogramMetric.java new file mode 100644 index 000000000..a7b62d50d --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/YammerHistogramMetric.java @@ -0,0 +1,66 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.metrics.wrapper; + +import com.automq.stream.s3.metrics.MetricsConfig; +import com.automq.stream.s3.metrics.MetricsLevel; +import com.automq.stream.s3.metrics.S3StreamMetricsManager; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.MetricName; +import io.opentelemetry.api.common.Attributes; + +public class YammerHistogramMetric extends ConfigurableMetrics { + private final Histogram histogram; + private final MetricsLevel currentMetricsLevel; + + public YammerHistogramMetric(MetricName metricName, MetricsLevel currentMetricsLevel, MetricsConfig metricsConfig) { + this(metricName, currentMetricsLevel, metricsConfig, Attributes.empty()); + } + + public YammerHistogramMetric(MetricName metricName, MetricsLevel currentMetricsLevel, MetricsConfig metricsConfig, Attributes extraAttributes) { + super(metricsConfig, extraAttributes); + this.histogram = S3StreamMetricsManager.METRICS_REGISTRY.newHistogram(metricName, true); + this.currentMetricsLevel = currentMetricsLevel; + } + + public long count() { + return histogram.count(); + } + + public long sum() { + return (long) histogram.sum(); + } + + public double p50() { + return histogram.getSnapshot().getMedian(); + } + + public double p99() { + return histogram.getSnapshot().get99thPercentile(); + } + + public double mean() { + return histogram.mean(); + } + + public double max() { + return histogram.max(); + } + + public void record(long value) { + histogram.update(value); + } + + public boolean shouldRecord() { + return currentMetricsLevel.isWithin(metricsLevel); + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 42f5558fc..7aa0e3aee 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -210,7 +210,7 @@ public CompletableFuture rangeRead(String path, long start, long end, T TimerUtil timerUtil = new TimerUtil(); networkInboundBandwidthLimiter.consume(throttleStrategy, end - start).whenCompleteAsync((v, ex) -> { NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND) - .record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + .record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (ex != null) { cf.completeExceptionally(ex); } else { @@ -311,12 +311,11 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture mergedRangeRead0(path, start, end, cf), 100, TimeUnit.MILLISECONDS); } - S3OperationStats.getInstance().getObjectStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().getObjectStats(size, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); }; readS3Client.getObject(request, AsyncResponseTransformer.toPublisher()) .thenAccept(responsePublisher -> { - S3ObjectStats.getInstance().objectDownloadSizeStats.record(MetricsLevel.INFO, size); CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); responsePublisher.subscribe((bytes) -> { // the aws client will copy DefaultHttpContent to heap ByteBuffer @@ -327,7 +326,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture { buf.release(); @@ -352,7 +351,7 @@ public CompletableFuture write(String path, ByteBuf data, ThrottleStrategy TimerUtil timerUtil = new TimerUtil(); networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> { NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND) - .record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + .record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (ex != null) { cf.completeExceptionally(ex); } else { @@ -372,12 +371,12 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) { AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers()); writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize); - S3OperationStats.getInstance().putObjectStats(objectSize, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().putObjectStats(objectSize, true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); data.release(); cf.complete(null); }).exceptionally(ex -> { - S3OperationStats.getInstance().putObjectStats(objectSize, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().putObjectStats(objectSize, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.error("PutObject for object {} fail", path, ex); cf.completeExceptionally(ex); @@ -400,10 +399,10 @@ public CompletableFuture delete(String path) { TimerUtil timerUtil = new TimerUtil(); DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build(); return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> { - S3OperationStats.getInstance().deleteObjectStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().deleteObjectStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); }).exceptionally(ex -> { - S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().deleteObjectsStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage()); return null; }); @@ -423,11 +422,11 @@ public CompletableFuture> delete(List objectKeys) { .build(); // TODO: handle not exist object, should we regard it as deleted or ignore it. return this.writeS3Client.deleteObjects(request).thenApply(resp -> { - S3OperationStats.getInstance().deleteObjectsStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().deleteObjectsStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList()); }).exceptionally(ex -> { - S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().deleteObjectsStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage()); return Collections.emptyList(); }); @@ -448,10 +447,10 @@ void createMultipartUpload0(String path, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build(); writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> { - S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().createMultiPartUploadStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(createMultipartUploadResponse.uploadId()); }).exceptionally(ex -> { - S3OperationStats.getInstance().createMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().createMultiPartUploadStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.error("CreateMultipartUpload for object {} fail", path, ex); cf.completeExceptionally(ex); @@ -495,12 +494,12 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body); uploadPartCf.thenAccept(uploadPartResponse -> { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size); - S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().uploadPartStats(size, true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); part.release(); CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build(); cf.complete(completedPart); }).exceptionally(ex -> { - S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().uploadPartStats(size, false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex); part.release(); @@ -532,12 +531,12 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath) .destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build(); writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> { - S3OperationStats.getInstance().uploadPartCopyStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().uploadPartCopyStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber) .eTag(uploadPartCopyResponse.copyPartResult().eTag()).build(); cf.complete(completedPart); }).exceptionally(ex -> { - S3OperationStats.getInstance().uploadPartCopyStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().uploadPartCopyStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex); cf.completeExceptionally(ex); @@ -567,10 +566,10 @@ public void completeMultipartUpload0(String path, String uploadId, List { - S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(null); }).exceptionally(ex -> { - S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); if (isUnrecoverable(ex)) { LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex); cf.completeExceptionally(ex); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index bd76c9e7e..76103c951 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -148,14 +148,14 @@ public CompletableFuture close() { objectPart = null; } - S3ObjectStats.getInstance().objectStageReadyCloseStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3ObjectStats.getInstance().objectStageReadyCloseStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); closeCf = new CompletableFuture<>(); CompletableFuture uploadDoneCf = uploadIdCf.thenCompose(uploadId -> CompletableFuture.allOf(parts.toArray(new CompletableFuture[0]))); FutureUtil.propagate(uploadDoneCf.thenCompose(nil -> operator.completeMultipartUpload(path, uploadId, genCompleteParts())), closeCf); closeCf.whenComplete((nil, ex) -> { - S3ObjectStats.getInstance().objectStageTotalStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3ObjectStats.getInstance().objectStageTotalStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); S3ObjectStats.getInstance().objectNumInTotalStats.add(MetricsLevel.DEBUG, 1); - S3ObjectStats.getInstance().objectUploadSizeStats.record(MetricsLevel.DEBUG, totalWriteSize.get()); + S3ObjectStats.getInstance().objectUploadSizeStats.record(totalWriteSize.get()); }); return closeCf; } @@ -233,7 +233,7 @@ private void upload0() { TimerUtil timerUtil = new TimerUtil(); FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf); partCf.whenComplete((nil, ex) -> { - S3ObjectStats.getInstance().objectStageUploadPartStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3ObjectStats.getInstance().objectStageUploadPartStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); }); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java index f074e2935..8821586a3 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java @@ -152,13 +152,13 @@ public boolean hasBatchingPart() { @Override public CompletableFuture close() { - S3ObjectStats.getInstance().objectStageReadyCloseStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3ObjectStats.getInstance().objectStageReadyCloseStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); int size = data.readableBytes(); FutureUtil.propagate(operator.write(path, data, throttleStrategy), cf); cf.whenComplete((nil, e) -> { - S3ObjectStats.getInstance().objectStageTotalStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + S3ObjectStats.getInstance().objectStageTotalStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); S3ObjectStats.getInstance().objectNumInTotalStats.add(MetricsLevel.DEBUG, 1); - S3ObjectStats.getInstance().objectUploadSizeStats.record(MetricsLevel.DEBUG, size); + S3ObjectStats.getInstance().objectUploadSizeStats.record(size); }); return cf; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java index ecee701a0..9f1e4589e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java @@ -118,6 +118,6 @@ public long size() { @Override public void polled() { - StorageOperationStats.getInstance().appendWALBlockPolledStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().appendWALBlockPolledStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS)); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 6dad28e1f..d7a0a44c4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -375,7 +375,7 @@ public AppendResult append(TraceContext context, ByteBuf buf, int crc) throws Ov return result; } catch (OverCapacityException ex) { buf.release(); - StorageOperationStats.getInstance().appendWALFullStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().appendWALFullStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); TraceUtils.endSpan(scope, ex); throw ex; } @@ -407,8 +407,8 @@ private AppendResult append0(ByteBuf body, int crc) throws OverCapacityException slidingWindowService.tryWriteBlock(); final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture); - appendResult.future().whenComplete((nil, ex) -> StorageOperationStats.getInstance().appendWALCompleteStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS))); - StorageOperationStats.getInstance().appendWALBeforeStats.record(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + appendResult.future().whenComplete((nil, ex) -> StorageOperationStats.getInstance().appendWALCompleteStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS))); + StorageOperationStats.getInstance().appendWALBeforeStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); return appendResult; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java index 64964e294..3a8812b44 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java @@ -354,7 +354,7 @@ private void writeBlockData(BlockBatch blocks) { walChannel.retryWrite(block.data(), position); } walChannel.retryFlush(); - StorageOperationStats.getInstance().appendWALWriteStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().appendWALWriteStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS)); } private void makeWriteOffsetMatchWindow(long newWindowEndOffset) { @@ -533,7 +533,7 @@ public WriteBlockProcessor(BlockBatch blocks) { @Override public void run() { - StorageOperationStats.getInstance().appendWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().appendWALAwaitStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS)); try { writeBlock(this.blocks); } catch (Exception e) { @@ -564,7 +564,7 @@ public String toString() { return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}'; } }); - StorageOperationStats.getInstance().appendWALAfterStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); + StorageOperationStats.getInstance().appendWALAfterStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS)); } } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java index 7512a0a7d..8b673aa30 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java @@ -19,6 +19,7 @@ import com.automq.stream.s3.metrics.MetricsConfig; import com.automq.stream.s3.metrics.MetricsLevel; +import com.yammer.metrics.core.MetricName; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.LongHistogram; @@ -38,13 +39,13 @@ public void testConfigurableMetrics() { Assertions.assertEquals(MetricsLevel.DEBUG, metric.metricsLevel); Assertions.assertEquals(Attributes.builder().put("extra", "v").put("base", "v2").build(), metric.attributes); - HistogramMetric histogramMetric = new HistogramMetric(new MetricsConfig(), Attributes.builder().put("extra", "v").build(), - Mockito.mock(LongHistogram.class)); - Assertions.assertEquals(MetricsLevel.INFO, histogramMetric.metricsLevel); + YammerHistogramMetric yammerHistogramMetric = new YammerHistogramMetric(Mockito.mock(MetricName.class), MetricsLevel.INFO, new MetricsConfig(), + Attributes.builder().put("extra", "v").build()); + Assertions.assertEquals(MetricsLevel.INFO, yammerHistogramMetric.metricsLevel); - histogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, Attributes.builder().put("base", "v2").build())); - Assertions.assertEquals(MetricsLevel.DEBUG, histogramMetric.metricsLevel); - Assertions.assertEquals(Attributes.builder().put("extra", "v").put("base", "v2").build(), histogramMetric.attributes); + yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, Attributes.builder().put("base", "v2").build())); + Assertions.assertEquals(MetricsLevel.DEBUG, yammerHistogramMetric.metricsLevel); + Assertions.assertEquals(Attributes.builder().put("extra", "v").put("base", "v2").build(), yammerHistogramMetric.attributes); } @Test @@ -56,11 +57,15 @@ public void testMetricsLevel() { Assertions.assertTrue(metric.add(MetricsLevel.INFO, 1)); Assertions.assertTrue(metric.add(MetricsLevel.DEBUG, 1)); - HistogramMetric histogramMetric = new HistogramMetric(new MetricsConfig(MetricsLevel.INFO, null), Mockito.mock(LongHistogram.class)); - Assertions.assertTrue(histogramMetric.record(MetricsLevel.INFO, 1)); - Assertions.assertFalse(histogramMetric.record(MetricsLevel.DEBUG, 1)); - histogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, null)); - Assertions.assertTrue(histogramMetric.record(MetricsLevel.INFO, 1)); - Assertions.assertTrue(histogramMetric.record(MetricsLevel.DEBUG, 1)); + YammerHistogramMetric yammerHistogramMetric = new YammerHistogramMetric(Mockito.mock(MetricName.class), MetricsLevel.INFO, new MetricsConfig(), + Attributes.builder().put("extra", "v").build()); + Assertions.assertTrue(yammerHistogramMetric.shouldRecord()); + yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, null)); + Assertions.assertTrue(yammerHistogramMetric.shouldRecord()); + yammerHistogramMetric = new YammerHistogramMetric(Mockito.mock(MetricName.class), MetricsLevel.DEBUG, new MetricsConfig(), + Attributes.builder().put("extra", "v").build()); + Assertions.assertFalse(yammerHistogramMetric.shouldRecord()); + yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.INFO, null)); + Assertions.assertTrue(yammerHistogramMetric.shouldRecord()); } } From a4e63cde578f5c62826c1d5a46caf0037368f6f8 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Wed, 6 Mar 2024 10:24:55 +0800 Subject: [PATCH 2/4] feat(s3stream): reduce histogram sample size by dividing into explicit percentiles Signed-off-by: Shichao Nie --- s3stream/build.gradle | 1 + .../s3/metrics/S3StreamMetricsConstant.java | 7 ++++++- .../s3/metrics/operations/S3Operation.java | 2 +- .../s3/metrics/stats/S3OperationStats.java | 18 ++++++++++++------ .../metrics/wrapper/HistogramInstrument.java | 14 ++++++++------ 5 files changed, 28 insertions(+), 14 deletions(-) diff --git a/s3stream/build.gradle b/s3stream/build.gradle index 8ffff008d..47e059998 100644 --- a/s3stream/build.gradle +++ b/s3stream/build.gradle @@ -48,6 +48,7 @@ dependencies { api 'org.aspectj:aspectjrt:1.9.20.1' api 'org.aspectj:aspectjweaver:1.9.20.1' api 'com.github.jnr:jnr-posix:3.1.19' + api 'com.yammer.metrics:metrics-core:2.2.0' testImplementation 'org.slf4j:slf4j-simple:2.0.9' testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0' testImplementation 'org.mockito:mockito-core:5.5.0' diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index 2bce6ac7b..1c1fb7974 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -70,7 +70,6 @@ public class S3StreamMetricsConstant { public static final String OBJECT_COUNT_METRIC_NAME = "object_count"; public static final String OBJECT_STAGE_COST_METRIC_NAME = "object_stage_cost"; public static final String OBJECT_UPLOAD_SIZE_METRIC_NAME = "object_upload_size"; - public static final String OBJECT_DOWNLOAD_SIZE_METRIC_NAME = "object_download_size"; public static final String NETWORK_INBOUND_USAGE_METRIC_NAME = "network_inbound_usage"; public static final String NETWORK_OUTBOUND_USAGE_METRIC_NAME = "network_outbound_usage"; public static final String NETWORK_INBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME = "network_inbound_available_bandwidth"; @@ -80,6 +79,12 @@ public class S3StreamMetricsConstant { public static final String NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_inbound_limiter_queue_time"; public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_outbound_limiter_queue_time"; public static final String READ_AHEAD_SIZE_METRIC_NAME = "read_ahead_size"; + public static final String SUM_METRIC_NAME_SUFFIX = "_sum"; + public static final String COUNT_METRIC_NAME_SUFFIX = "_count"; + public static final String P50_METRIC_NAME_SUFFIX = "_50p"; + public static final String P99_METRIC_NAME_SUFFIX = "_99p"; + public static final String MEAN_METRIC_NAME_SUFFIX = "_mean"; + public static final String MAX_METRIC_NAME_SUFFIX = "_max"; public static final String WAL_START_OFFSET = "wal_start_offset"; public static final String WAL_TRIMMED_OFFSET = "wal_trimmed_offset"; public static final String DELTA_WAL_CACHE_SIZE = "delta_wal_cache_size"; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java index bd4ac18ac..f5608e4c5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Operation.java @@ -67,7 +67,7 @@ public enum S3Operation { S3Operation(S3MetricsType type, String name) { this.type = type; this.name = name; - uniqueKey = type + "-" + name; + uniqueKey = type.getName() + "-" + name; } public String getName() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java index e10fec3bc..1d6d8d719 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/S3OperationStats.java @@ -81,11 +81,13 @@ public YammerHistogramMetric getObjectStats(long size, boolean isSuccess) { String label = AttributesUtils.getObjectBucketLabel(size); if (isSuccess) { return getObjectSuccessStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - new MetricName(S3OperationStats.class, S3Operation.GET_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + new MetricName(S3OperationStats.class, S3Operation.GET_OBJECT.getUniqueKey() + + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS + label), MetricsLevel.INFO, S3Operation.GET_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label)); } else { return getObjectFailedStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - new MetricName(S3OperationStats.class, S3Operation.GET_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + new MetricName(S3OperationStats.class, S3Operation.GET_OBJECT.getUniqueKey() + + S3StreamMetricsConstant.LABEL_STATUS_FAILED + label), MetricsLevel.INFO, S3Operation.GET_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label)); } } @@ -94,11 +96,13 @@ public YammerHistogramMetric putObjectStats(long size, boolean isSuccess) { String label = AttributesUtils.getObjectBucketLabel(size); if (isSuccess) { return putObjectSuccessStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - new MetricName(S3OperationStats.class, S3Operation.PUT_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + new MetricName(S3OperationStats.class, S3Operation.PUT_OBJECT.getUniqueKey() + + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS + label), MetricsLevel.INFO, S3Operation.PUT_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label)); } else { return putObjectFailedStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - new MetricName(S3OperationStats.class, S3Operation.PUT_OBJECT.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + new MetricName(S3OperationStats.class, S3Operation.PUT_OBJECT.getUniqueKey() + + S3StreamMetricsConstant.LABEL_STATUS_FAILED + label), MetricsLevel.INFO, S3Operation.PUT_OBJECT, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label)); } } @@ -107,11 +111,13 @@ public YammerHistogramMetric uploadPartStats(long size, boolean isSuccess) { String label = AttributesUtils.getObjectBucketLabel(size); if (isSuccess) { return uploadPartSuccessStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS), + new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART.getUniqueKey() + + S3StreamMetricsConstant.LABEL_STATUS_SUCCESS + label), MetricsLevel.INFO, S3Operation.UPLOAD_PART, S3StreamMetricsConstant.LABEL_STATUS_SUCCESS, label)); } else { return uploadPartFailedStats.computeIfAbsent(label, name -> S3StreamMetricsManager.buildOperationMetric( - new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART.getUniqueKey() + S3StreamMetricsConstant.LABEL_STATUS_FAILED), + new MetricName(S3OperationStats.class, S3Operation.UPLOAD_PART.getUniqueKey() + + S3StreamMetricsConstant.LABEL_STATUS_FAILED + label), MetricsLevel.INFO, S3Operation.UPLOAD_PART, S3StreamMetricsConstant.LABEL_STATUS_FAILED, label)); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java index ef6617828..a0d591bc6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java @@ -17,6 +17,7 @@ package com.automq.stream.s3.metrics.wrapper; +import com.automq.stream.s3.metrics.S3StreamMetricsConstant; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableDoubleGauge; import io.opentelemetry.api.metrics.ObservableLongGauge; @@ -34,7 +35,7 @@ public class HistogramInstrument { public HistogramInstrument(Meter meter, String name, String desc, String unit) { this.histograms = new CopyOnWriteArrayList<>(); - this.count = meter.gaugeBuilder(name) + this.count = meter.gaugeBuilder(name + S3StreamMetricsConstant.COUNT_METRIC_NAME_SUFFIX) .setDescription(desc + " (count)") .ofLongs() .buildWithCallback(result -> { @@ -44,9 +45,10 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { } }); }); - this.sum = meter.gaugeBuilder(name) + this.sum = meter.gaugeBuilder(name + S3StreamMetricsConstant.SUM_METRIC_NAME_SUFFIX) .setDescription(desc + " (sum)") .ofLongs() + .setUnit(unit) .buildWithCallback(result -> { histograms.forEach(histogram -> { if (histogram.shouldRecord()) { @@ -54,7 +56,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { } }); }); - this.histP50Value = meter.gaugeBuilder(name) + this.histP50Value = meter.gaugeBuilder(name + S3StreamMetricsConstant.P50_METRIC_NAME_SUFFIX) .setDescription(desc + " (50th percentile)") .setUnit(unit) .buildWithCallback(result -> { @@ -64,7 +66,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { } }); }); - this.histP99Value = meter.gaugeBuilder(name) + this.histP99Value = meter.gaugeBuilder(name + S3StreamMetricsConstant.P99_METRIC_NAME_SUFFIX) .setDescription(desc + " (99th percentile)") .setUnit(unit) .buildWithCallback(result -> { @@ -74,7 +76,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { } }); }); - this.histMeanValue = meter.gaugeBuilder(name) + this.histMeanValue = meter.gaugeBuilder(name + S3StreamMetricsConstant.MEAN_METRIC_NAME_SUFFIX) .setDescription(desc + " (mean)") .setUnit(unit) .buildWithCallback(result -> { @@ -84,7 +86,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { } }); }); - this.histMaxValue = meter.gaugeBuilder(name) + this.histMaxValue = meter.gaugeBuilder(name + S3StreamMetricsConstant.MAX_METRIC_NAME_SUFFIX) .setDescription(desc + " (max)") .setUnit(unit) .buildWithCallback(result -> { From b1f615aa725ff8d35c475a30bf3a14905f1783eb Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Wed, 6 Mar 2024 16:42:31 +0800 Subject: [PATCH 3/4] fix(s3stream): remove unused imports Signed-off-by: Shichao Nie --- s3stream/src/main/java/com/automq/stream/s3/S3Storage.java | 1 - s3stream/src/main/java/com/automq/stream/s3/S3Stream.java | 1 - s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java | 1 - .../java/com/automq/stream/s3/cache/DefaultS3BlockCache.java | 1 - s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java | 1 - .../src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java | 1 - .../src/main/java/com/automq/stream/s3/cache/StreamReader.java | 1 - .../java/com/automq/stream/s3/operator/DefaultS3Operator.java | 1 - s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java | 1 - .../src/main/java/com/automq/stream/s3/wal/BlockWALService.java | 1 - .../main/java/com/automq/stream/s3/wal/SlidingWindowService.java | 1 - .../com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java | 1 - 12 files changed, 12 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index f3f279b61..11ab16d80 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -19,7 +19,6 @@ import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; import com.automq.stream.s3.metadata.StreamMetadata; -import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StorageOperationStats; diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index e4c94976f..a2890f60a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -24,7 +24,6 @@ import com.automq.stream.s3.cache.CacheAccessType; import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; -import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StreamOperationStats; import com.automq.stream.s3.model.StreamRecordBatch; diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index 2cffa16d4..2dee62a96 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -20,7 +20,6 @@ import com.automq.stream.api.StreamClient; import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; -import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StreamOperationStats; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java index 3c89be111..6ea9d9f05 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java @@ -12,7 +12,6 @@ package com.automq.stream.s3.cache; import com.automq.stream.s3.Config; -import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StorageOperationStats; import com.automq.stream.s3.model.StreamRecordBatch; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 0c514a163..85e2623e2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -11,7 +11,6 @@ package com.automq.stream.s3.cache; -import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StorageOperationStats; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java index 104650f08..efdeab097 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/ReadAheadAgent.java @@ -11,7 +11,6 @@ package com.automq.stream.s3.cache; -import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StorageOperationStats; import com.automq.stream.utils.LogContext; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java index 3bd068fd2..6f3610a08 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java @@ -14,7 +14,6 @@ import com.automq.stream.s3.ObjectReader; import com.automq.stream.s3.StreamDataBlock; import com.automq.stream.s3.metadata.S3ObjectMetadata; -import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StorageOperationStats; import com.automq.stream.s3.model.StreamRecordBatch; diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 7aa0e3aee..97f0a5a16 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -16,7 +16,6 @@ import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.NetworkStats; -import com.automq.stream.s3.metrics.stats.S3ObjectStats; import com.automq.stream.s3.metrics.stats.S3OperationStats; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; import com.automq.stream.s3.network.ThrottleStrategy; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java index 9f1e4589e..c5d2c7ec8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java @@ -12,7 +12,6 @@ package com.automq.stream.s3.wal; import com.automq.stream.s3.ByteBufAlloc; -import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StorageOperationStats; import com.automq.stream.s3.wal.util.WALUtil; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index d7a0a44c4..697e249b4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -13,7 +13,6 @@ import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.Config; -import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StorageOperationStats; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java index 3a8812b44..ceb15991c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java @@ -12,7 +12,6 @@ package com.automq.stream.s3.wal; import com.automq.stream.s3.ByteBufAlloc; -import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StorageOperationStats; import com.automq.stream.s3.wal.util.WALChannel; diff --git a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java index 8b673aa30..d08dfd4a7 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java @@ -22,7 +22,6 @@ import com.yammer.metrics.core.MetricName; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.api.metrics.LongHistogram; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; From 7b8b7d5b108cb462a1770cbc4555f0fc9233b42a Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Wed, 6 Mar 2024 17:22:57 +0800 Subject: [PATCH 4/4] fix(s3stream): fix unit tests Signed-off-by: Shichao Nie --- .../s3/metrics/S3StreamMetricsConstant.java | 1 - .../s3/metrics/S3StreamMetricsManager.java | 43 ++++++++++--------- .../metrics/wrapper/HistogramInstrument.java | 16 +++---- .../metrics/wrapper/MetricsWrapperTest.java | 2 +- 4 files changed, 32 insertions(+), 30 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index 1c1fb7974..5d5dbf460 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -69,7 +69,6 @@ public class S3StreamMetricsConstant { public static final String OPERATION_LATENCY_METRIC_NAME = "operation_latency"; public static final String OBJECT_COUNT_METRIC_NAME = "object_count"; public static final String OBJECT_STAGE_COST_METRIC_NAME = "object_stage_cost"; - public static final String OBJECT_UPLOAD_SIZE_METRIC_NAME = "object_upload_size"; public static final String NETWORK_INBOUND_USAGE_METRIC_NAME = "network_inbound_usage"; public static final String NETWORK_OUTBOUND_USAGE_METRIC_NAME = "network_outbound_usage"; public static final String NETWORK_INBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME = "network_inbound_available_bandwidth"; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index d4e71d552..c50bda24d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -29,18 +29,23 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Supplier; public class S3StreamMetricsManager { private static final List BASE_ATTRIBUTES_LISTENERS = new ArrayList<>(); public static final MetricsRegistry METRICS_REGISTRY = new MetricsRegistry(); + public static final List OPERATION_LATENCY_METRICS = new CopyOnWriteArrayList<>(); + public static final List OBJECT_STAGE_METRICS = new CopyOnWriteArrayList<>(); + public static final List NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>(); + public static final List NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>(); + public static final List READ_AHEAD_SIZE_METRICS = new CopyOnWriteArrayList<>(); + public static final List READ_AHEAD_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>(); private static LongCounter s3DownloadSizeInTotal = new NoopLongCounter(); private static LongCounter s3UploadSizeInTotal = new NoopLongCounter(); private static HistogramInstrument operationLatency; private static LongCounter objectNumInTotal = new NoopLongCounter(); private static HistogramInstrument objectStageCost; - private static HistogramInstrument objectUploadSize; - private static HistogramInstrument objectDownloadSize; private static LongCounter networkInboundUsageInTotal = new NoopLongCounter(); private static LongCounter networkOutboundUsageInTotal = new NoopLongCounter(); private static ObservableLongGauge networkInboundAvailableBandwidth = new NoopObservableLongGauge(); @@ -106,14 +111,12 @@ public static void initMetrics(Meter meter, String prefix) { .setUnit("bytes") .build(); operationLatency = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OPERATION_LATENCY_METRIC_NAME, - "Operation latency", "nanoseconds"); + "Operation latency", "nanoseconds", () -> OPERATION_LATENCY_METRICS); objectNumInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.OBJECT_COUNT_METRIC_NAME) .setDescription("Objects count") .build(); objectStageCost = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OBJECT_STAGE_COST_METRIC_NAME, - "Objects stage cost", "nanoseconds"); - objectUploadSize = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OBJECT_UPLOAD_SIZE_METRIC_NAME, - "Objects upload size", "bytes"); + "Objects stage cost", "nanoseconds", () -> OBJECT_STAGE_METRICS); networkInboundUsageInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_USAGE_METRIC_NAME) .setDescription("Network inbound usage") .setUnit("bytes") @@ -157,13 +160,13 @@ public static void initMetrics(Meter meter, String prefix) { } }); networkInboundLimiterQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME, - "Network inbound limiter queue time", "nanoseconds"); + "Network inbound limiter queue time", "nanoseconds", () -> NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS); networkOutboundLimiterQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME, - "Network outbound limiter queue time", "nanoseconds"); + "Network outbound limiter queue time", "nanoseconds", () -> NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS); readAheadSize = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.READ_AHEAD_SIZE_METRIC_NAME, - "Read ahead size", "bytes"); + "Read ahead size", "bytes", () -> READ_AHEAD_SIZE_METRICS); readAheadLimierQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.READ_AHEAD_QUEUE_TIME_METRIC_NAME, - "Read ahead limiter queue time", "nanoseconds"); + "Read ahead limiter queue time", "nanoseconds", () -> READ_AHEAD_LIMITER_QUEUE_TIME_METRICS); deltaWalStartOffset = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.WAL_START_OFFSET) .setDescription("Delta WAL start offset") .ofLongs() @@ -330,7 +333,7 @@ public static YammerHistogramMetric buildStageOperationMetric(MetricName metricN YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, AttributesUtils.buildAttributes(stage)); BASE_ATTRIBUTES_LISTENERS.add(metric); - operationLatency.registerYammerHistogramMetric(metric); + OPERATION_LATENCY_METRICS.add(metric); return metric; } } @@ -340,7 +343,7 @@ public static YammerHistogramMetric buildOperationMetric(MetricName metricName, YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, AttributesUtils.buildAttributes(operation)); BASE_ATTRIBUTES_LISTENERS.add(metric); - operationLatency.registerYammerHistogramMetric(metric); + OPERATION_LATENCY_METRICS.add(metric); return metric; } } @@ -351,7 +354,7 @@ public static YammerHistogramMetric buildOperationMetric(MetricName metricName, YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, AttributesUtils.buildAttributes(operation, status, sizeLabelName)); BASE_ATTRIBUTES_LISTENERS.add(metric); - operationLatency.registerYammerHistogramMetric(metric); + OPERATION_LATENCY_METRICS.add(metric); return metric; } } @@ -361,7 +364,7 @@ public static YammerHistogramMetric buildOperationMetric(MetricName metricName, YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, AttributesUtils.buildAttributes(operation, status)); BASE_ATTRIBUTES_LISTENERS.add(metric); - operationLatency.registerYammerHistogramMetric(metric); + OPERATION_LATENCY_METRICS.add(metric); return metric; } } @@ -379,7 +382,7 @@ public static YammerHistogramMetric buildObjectStageCostMetric(MetricName metric YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, AttributesUtils.buildAttributes(stage)); BASE_ATTRIBUTES_LISTENERS.add(metric); - objectStageCost.registerYammerHistogramMetric(metric); + OBJECT_STAGE_METRICS.add(metric); return metric; } } @@ -388,7 +391,7 @@ public static YammerHistogramMetric buildObjectUploadSizeMetric(MetricName metri synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); - objectStageCost.registerYammerHistogramMetric(metric); + OBJECT_STAGE_METRICS.add(metric); return metric; } } @@ -413,7 +416,7 @@ public static YammerHistogramMetric buildNetworkInboundLimiterQueueTimeMetric(Me synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); - networkInboundLimiterQueueTime.registerYammerHistogramMetric(metric); + NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS.add(metric); return metric; } } @@ -422,7 +425,7 @@ public static YammerHistogramMetric buildNetworkOutboundLimiterQueueTimeMetric(M synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); - networkOutboundLimiterQueueTime.registerYammerHistogramMetric(metric); + NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS.add(metric); return metric; } } @@ -431,7 +434,7 @@ public static YammerHistogramMetric buildReadAheadSizeMetric(MetricName metricNa synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); - readAheadSize.registerYammerHistogramMetric(metric); + READ_AHEAD_SIZE_METRICS.add(metric); return metric; } @@ -441,7 +444,7 @@ public static YammerHistogramMetric buildReadAheadLimiterQueueTimeMetric(MetricN synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); - readAheadLimierQueueTime.registerYammerHistogramMetric(metric); + READ_AHEAD_LIMITER_QUEUE_TIME_METRICS.add(metric); return metric; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java index a0d591bc6..2b9066020 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java @@ -22,10 +22,9 @@ import io.opentelemetry.api.metrics.ObservableDoubleGauge; import io.opentelemetry.api.metrics.ObservableLongGauge; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Supplier; public class HistogramInstrument { - private final List histograms; private final ObservableLongGauge count; private final ObservableLongGauge sum; private final ObservableDoubleGauge histP50Value; @@ -33,12 +32,12 @@ public class HistogramInstrument { private final ObservableDoubleGauge histMeanValue; private final ObservableDoubleGauge histMaxValue; - public HistogramInstrument(Meter meter, String name, String desc, String unit) { - this.histograms = new CopyOnWriteArrayList<>(); + public HistogramInstrument(Meter meter, String name, String desc, String unit, Supplier> histogramsSupplier) { this.count = meter.gaugeBuilder(name + S3StreamMetricsConstant.COUNT_METRIC_NAME_SUFFIX) .setDescription(desc + " (count)") .ofLongs() .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.count(), histogram.attributes); @@ -50,6 +49,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { .ofLongs() .setUnit(unit) .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.sum(), histogram.attributes); @@ -60,6 +60,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { .setDescription(desc + " (50th percentile)") .setUnit(unit) .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.p50(), histogram.attributes); @@ -70,6 +71,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { .setDescription(desc + " (99th percentile)") .setUnit(unit) .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.p99(), histogram.attributes); @@ -80,6 +82,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { .setDescription(desc + " (mean)") .setUnit(unit) .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.mean(), histogram.attributes); @@ -90,6 +93,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { .setDescription(desc + " (max)") .setUnit(unit) .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.max(), histogram.attributes); @@ -97,8 +101,4 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { }); }); } - - public void registerYammerHistogramMetric(YammerHistogramMetric histogram) { - histograms.add(histogram); - } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java index d08dfd4a7..fbea91eaf 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java @@ -64,7 +64,7 @@ public void testMetricsLevel() { yammerHistogramMetric = new YammerHistogramMetric(Mockito.mock(MetricName.class), MetricsLevel.DEBUG, new MetricsConfig(), Attributes.builder().put("extra", "v").build()); Assertions.assertFalse(yammerHistogramMetric.shouldRecord()); - yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.INFO, null)); + yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, null)); Assertions.assertTrue(yammerHistogramMetric.shouldRecord()); } }