From d7ff1c4c7b6a7eaad94e8498f8aa427b480a44d5 Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Fri, 16 Aug 2024 15:38:41 +0800 Subject: [PATCH] feat(s3stream): add AsyncLRUCache metric --- .../automq/stream/s3/cache/AsyncLRUCache.java | 35 +++- .../stream/s3/cache/ObjectReaderLRUCache.java | 4 +- .../DefaultObjectReaderFactory.java | 2 +- .../stream/s3/index/NodeRangeIndexCache.java | 2 +- .../s3/metrics/S3StreamMetricsConstant.java | 13 ++ .../s3/metrics/S3StreamMetricsManager.java | 198 ++++++++++++++++++ .../s3/metrics/stats/AsyncLRUCacheStats.java | 95 +++++++++ .../stream/s3/cache/AsyncLRUCacheTest.java | 8 +- .../s3/cache/ObjectReaderLRUCacheTest.java | 4 +- 9 files changed, 347 insertions(+), 14 deletions(-) create mode 100644 s3stream/src/main/java/com/automq/stream/s3/metrics/stats/AsyncLRUCacheStats.java diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java index 5b49971c12..d24409d993 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java @@ -16,35 +16,49 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; + +import com.automq.stream.s3.metrics.S3StreamMetricsManager; +import com.automq.stream.s3.metrics.stats.AsyncLRUCacheStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AsyncLRUCache { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncLRUCache.class); + private final AsyncLRUCacheStats stats = AsyncLRUCacheStats.getInstance(); + private final String cacheName; private final long maxSize; final AtomicLong totalSize = new AtomicLong(0); final LRUCache cache = new LRUCache<>(); final Set completedSet = new HashSet<>(); final Set removedSet = new HashSet<>(); - public AsyncLRUCache(long maxSize) { - super(); + public AsyncLRUCache(String cacheName, long maxSize) { + this.cacheName = cacheName; if (maxSize <= 0) { throw new IllegalArgumentException("maxSize must be positive"); } this.maxSize = maxSize; + + S3StreamMetricsManager.registerAsyncCacheSizeSupplier(this::totalSize, cacheName); + S3StreamMetricsManager.registerAsyncCacheMaxSizeSupplier(() -> maxSize, cacheName); + S3StreamMetricsManager.registerAsyncCacheItemNumberSupplier(this::size, cacheName); } public synchronized void put(K key, V value) { V oldValue = cache.get(key); if (oldValue != null && oldValue != value) { + stats.markOverWrite(cacheName); cache.remove(key); afterRemoveValue(oldValue); + } else { + stats.markPut(cacheName); } + cache.put(key, value); value.size().whenComplete((v, ex) -> { synchronized (AsyncLRUCache.this) { if (ex != null) { + stats.markItemCompleteExceptionally(cacheName); cache.remove(key); } else if (!removedSet.contains(value)) { completedSet.add(value); @@ -64,12 +78,18 @@ public synchronized void put(K key, V value) { } public synchronized V get(K key) { - return cache.get(key); + V val = cache.get(key); + if (val == null) { + stats.markMiss(cacheName); + } else { + stats.markHit(cacheName); + } + return val; } public synchronized boolean remove(K key) { - V value = cache.get(key); + V value = cache.get(key); if (value == null) { return false; } @@ -82,9 +102,11 @@ private synchronized void afterRemoveValue(V value) { try { boolean completed = completedSet.remove(value); if (completed) { + stats.markRemoveCompleted(cacheName); totalSize.addAndGet(-value.size().get()); value.close(); } else { + stats.markRemoveNotCompleted(cacheName); removedSet.add(value); } } catch (Throwable e) { @@ -97,6 +119,9 @@ public synchronized Map.Entry pop() { if (entry != null) { afterRemoveValue(entry.getValue()); } + + stats.markPop(cacheName); + return entry; } @@ -123,5 +148,7 @@ private synchronized void evict() { afterRemoveValue(value); }); } + + stats.markEvict(cacheName); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/ObjectReaderLRUCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/ObjectReaderLRUCache.java index 0d9403bcf3..c6e5b6f5ad 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/ObjectReaderLRUCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/ObjectReaderLRUCache.java @@ -15,7 +15,7 @@ public class ObjectReaderLRUCache extends AsyncLRUCache { - public ObjectReaderLRUCache(int maxObjectSize) { - super(maxObjectSize); + public ObjectReaderLRUCache(String cacheName, int maxObjectSize) { + super(cacheName, maxObjectSize); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java index e8d1df1300..4ffaf7643a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java @@ -23,7 +23,7 @@ public class DefaultObjectReaderFactory implements ObjectReaderFactory { private final ObjectStorage objectStorage; public DefaultObjectReaderFactory(ObjectStorage objectStorage) { - this.objectReaders = new ObjectReaderLRUCache(MAX_OBJECT_READER_SIZE); + this.objectReaders = new ObjectReaderLRUCache("ObjectReader", MAX_OBJECT_READER_SIZE); this.objectStorage = objectStorage; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/index/NodeRangeIndexCache.java b/s3stream/src/main/java/com/automq/stream/s3/index/NodeRangeIndexCache.java index b092c97872..e7b4c6c57f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/index/NodeRangeIndexCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/index/NodeRangeIndexCache.java @@ -100,7 +100,7 @@ public void close() { static class LRUCache extends AsyncLRUCache { public LRUCache(int maxSize) { - super(maxSize); + super("NodeRangeIndex", maxSize); } } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index bd9f4e7638..08065abd07 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -98,6 +98,18 @@ public class S3StreamMetricsConstant { public static final String INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME = "inflight_wal_upload_tasks_count"; public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size"; public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size"; + public static final String ASYNC_CACHE_EVICT_COUNT_METRIC_NAME = "async_cache_evict"; + public static final String ASYNC_CACHE_HIT_COUNT_METRIC_NAME = "async_cache_hit"; + public static final String ASYNC_CACHE_MISS_COUNT_METRIC_NAME = "async_cache_miss"; + public static final String ASYNC_CACHE_PUT_COUNT_METRIC_NAME = "async_cache_put"; + public static final String ASYNC_CACHE_POP_COUNT_METRIC_NAME = "async_cache_pop"; + public static final String ASYNC_CACHE_OVERWRITE_COUNT_METRIC_NAME = "async_cache_overwrite"; + public static final String ASYNC_CACHE_REMOVE_NOT_COMPLETE_COUNT_METRIC_NAME = "async_cache_remove_item_not_complete"; + public static final String ASYNC_CACHE_REMOVE_COMPLETE_COUNT_METRIC_NAME = "async_cache_remove_item_complete"; + public static final String ASYNC_CACHE_ITEM_COMPLETE_EXCEPTIONALLY_COUNT_METRIC_NAME = "async_cache_item_complete_exceptionally"; + public static final String ASYNC_CACHE_ITEM_NUMBER_METRIC_NAME = "async_cache_item_count"; + public static final String ASYNC_CACHE_ITEM_SIZE_NAME = "async_cache_item_size"; + public static final String ASYNC_CACHE_ITEM_MAX_SIZE_NAME = "async_cache_max_size"; public static final String BUFFER_ALLOCATED_MEMORY_SIZE_METRIC_NAME = "buffer_allocated_memory_size"; public static final String BUFFER_USED_MEMORY_SIZE_METRIC_NAME = "buffer_used_memory_size"; public static final String READ_S3_LIMITER_TIME_METRIC_NAME = "read_s3_limiter_time"; @@ -113,6 +125,7 @@ public class S3StreamMetricsConstant { public static final AttributeKey LABEL_STATUS = AttributeKey.stringKey("status"); public static final AttributeKey LABEL_TYPE = AttributeKey.stringKey("type"); public static final AttributeKey LABEL_INDEX = AttributeKey.stringKey("index"); + public static final AttributeKey LABEL_CACHE_NAME = AttributeKey.stringKey("cacheName"); public static final String LABEL_STATUS_SUCCESS = "success"; public static final String LABEL_STATUS_FAILED = "failed"; public static final String LABEL_STATUS_HIT = "hit"; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 5b42561d6b..976e2886f6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -30,8 +30,11 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.LongSupplier; import java.util.function.Supplier; +import static com.automq.stream.s3.metrics.S3StreamMetricsConstant.LABEL_CACHE_NAME; + public class S3StreamMetricsManager { private static final List BASE_ATTRIBUTES_LISTENERS = new ArrayList<>(); public static final List OPERATION_LATENCY_METRICS = new CopyOnWriteArrayList<>(); @@ -71,12 +74,28 @@ public class S3StreamMetricsManager { private static ObservableLongGauge availableInflightReadAheadSize = new NoopObservableLongGauge(); private static ObservableLongGauge availableInflightS3ReadQuota = new NoopObservableLongGauge(); private static ObservableLongGauge availableInflightS3WriteQuota = new NoopObservableLongGauge(); + + private static ObservableLongGauge asyncCacheItemNumber = new NoopObservableLongGauge(); + private static ObservableLongGauge asyncCacheSizeNumber = new NoopObservableLongGauge(); + private static ObservableLongGauge asyncCacheMaxSizeNumber = new NoopObservableLongGauge(); + private static ObservableLongGauge inflightWALUploadTasksCount = new NoopObservableLongGauge(); private static ObservableLongGauge allocatedMemorySize = new NoopObservableLongGauge(); private static ObservableLongGauge usedMemorySize = new NoopObservableLongGauge(); private static ObservableLongGauge pendingStreamAppendLatencyMetrics = new NoopObservableLongGauge(); private static ObservableLongGauge pendingStreamFetchLatencyMetrics = new NoopObservableLongGauge(); private static ObservableLongGauge compactionDelayTimeMetrics = new NoopObservableLongGauge(); + + private static LongCounter asyncCacheEvictCount; + private static LongCounter asyncCacheHitCount; + private static LongCounter asyncCacheMissCount; + private static LongCounter asyncCachePutCount; + private static LongCounter asyncCachePopCount; + private static LongCounter asyncCacheOverWriteCount; + private static LongCounter asyncCacheRemoveNotCompletedCount; + private static LongCounter asyncCacheRemoveCompletedCount; + private static LongCounter asyncCacheItemCompleteExceptionallyCount; + private static LongCounter compactionReadSizeInTotal = new NoopLongCounter(); private static LongCounter compactionWriteSizeInTotal = new NoopLongCounter(); private static Supplier networkInboundAvailableBandwidthSupplier = () -> 0L; @@ -94,6 +113,11 @@ public class S3StreamMetricsManager { private static Map> availableInflightS3ReadQuotaSupplier = new ConcurrentHashMap<>(); private static Map> availableInflightS3WriteQuotaSupplier = new ConcurrentHashMap<>(); + + private static Map asyncCacheItemNumberSupplier = new ConcurrentHashMap<>(); + private static Map asyncCacheSizeSupplier = new ConcurrentHashMap<>(); + private static Map asyncCacheMaxSizeSupplier = new ConcurrentHashMap<>(); + private static Supplier inflightWALUploadTasksCountSupplier = () -> 0; private static Map> pendingStreamAppendLatencySupplier = new ConcurrentHashMap<>(); private static Map> pendingStreamFetchLatencySupplier = new ConcurrentHashMap<>(); @@ -283,6 +307,78 @@ public static void initMetrics(Meter meter, String prefix) { .setDescription("Compaction write size") .setUnit("bytes") .build(); + + asyncCacheEvictCount = meter.counterBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_EVICT_COUNT_METRIC_NAME) + .setDescription("AsyncLRU cache evict count") + .setUnit("count") + .build(); + + asyncCacheHitCount = meter.counterBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_HIT_COUNT_METRIC_NAME) + .setDescription("AsyncLRU cache hit count") + .setUnit("count") + .build(); + + asyncCacheMissCount = meter.counterBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_MISS_COUNT_METRIC_NAME) + .setDescription("AsyncLRU cache miss count") + .setUnit("count") + .build(); + + asyncCachePutCount = meter.counterBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_PUT_COUNT_METRIC_NAME) + .setDescription("AsyncLRU cache put item count") + .setUnit("count") + .build(); + asyncCachePopCount = meter.counterBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_POP_COUNT_METRIC_NAME) + .setDescription("AsyncLRU cache pop item count") + .setUnit("count") + .build(); + asyncCacheOverWriteCount = meter.counterBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_OVERWRITE_COUNT_METRIC_NAME) + .setDescription("AsyncLRU cache overwrite item count") + .setUnit("count") + .build(); + asyncCacheRemoveNotCompletedCount = meter.counterBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_REMOVE_NOT_COMPLETE_COUNT_METRIC_NAME) + .setDescription("AsyncLRU cache remove not completed item count") + .setUnit("count") + .build(); + asyncCacheRemoveCompletedCount = meter.counterBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_REMOVE_COMPLETE_COUNT_METRIC_NAME) + .setDescription("AsyncLRU cache remove completed item count") + .setUnit("count") + .build(); + asyncCacheItemCompleteExceptionallyCount = meter.counterBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_ITEM_COMPLETE_EXCEPTIONALLY_COUNT_METRIC_NAME) + .setDescription("AsyncLRU cache item complete exceptionally count") + .setUnit("count") + .build(); + + asyncCacheItemNumber = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_ITEM_NUMBER_METRIC_NAME) + .setDescription("AsyncLRU cache item number") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel())) { + for (Map.Entry entry : asyncCacheItemNumberSupplier.entrySet()) { + result.record(entry.getValue().getAsLong(), Attributes.of(LABEL_CACHE_NAME, entry.getKey())); + } + } + }); + asyncCacheSizeNumber = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_ITEM_SIZE_NAME) + .setDescription("AsyncLRU cache size") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel())) { + for (Map.Entry entry : asyncCacheSizeSupplier.entrySet()) { + result.record(entry.getValue().getAsLong(), Attributes.of(LABEL_CACHE_NAME, entry.getKey())); + } + } + }); + asyncCacheMaxSizeNumber = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.ASYNC_CACHE_ITEM_MAX_SIZE_NAME) + .setDescription("AsyncLRU cache max size") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel())) { + for (Map.Entry entry : asyncCacheMaxSizeSupplier.entrySet()) { + result.record(entry.getValue().getAsLong(), Attributes.of(LABEL_CACHE_NAME, entry.getKey())); + } + } + }); + allocatedMemorySize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BUFFER_ALLOCATED_MEMORY_SIZE_METRIC_NAME) .setDescription("Buffer allocated memory size") .setUnit("bytes") @@ -380,6 +476,18 @@ public static void registerInflightS3WriteQuotaSupplier(Supplier inflig S3StreamMetricsManager.availableInflightS3WriteQuotaSupplier.putIfAbsent(index, inflightS3WriteQuotaSupplier); } + public static void registerAsyncCacheSizeSupplier(LongSupplier supplier, String cacheName) { + S3StreamMetricsManager.asyncCacheSizeSupplier.put(cacheName, supplier); + } + + public static void registerAsyncCacheItemNumberSupplier(LongSupplier supplier, String cacheName) { + S3StreamMetricsManager.asyncCacheItemNumberSupplier.put(cacheName, supplier); + } + + public static void registerAsyncCacheMaxSizeSupplier(LongSupplier supplier, String cacheName) { + S3StreamMetricsManager.asyncCacheMaxSizeSupplier.put(cacheName, supplier); + } + public static void registerInflightReadSizeLimiterSupplier( Supplier availableInflightReadAheadSizeSupplier) { S3StreamMetricsManager.availableInflightReadAheadSizeSupplier = availableInflightReadAheadSizeSupplier; @@ -583,6 +691,96 @@ public static HistogramMetric buildGetIndexTimeMetric(MetricsLevel metricsLevel, } } + public static CounterMetric buildAsyncCacheEvictMetric(String cacheName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, + Attributes.of(LABEL_CACHE_NAME, cacheName), + () -> asyncCacheEvictCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static CounterMetric buildAsyncCacheHitMetric(String cacheName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, + Attributes.of(LABEL_CACHE_NAME, cacheName), + () -> asyncCacheHitCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static CounterMetric buildAsyncCacheMissMetric(String cacheName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, + Attributes.of(LABEL_CACHE_NAME, cacheName), + () -> asyncCacheMissCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static CounterMetric buildAsyncCachePutMetric(String cacheName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, + Attributes.of(LABEL_CACHE_NAME, cacheName), + () -> asyncCachePutCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static CounterMetric buildAsyncCachePopMetric(String cacheName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, + Attributes.of(LABEL_CACHE_NAME, cacheName), + () -> asyncCachePopCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static CounterMetric buildAsyncCacheOverwriteMetric(String cacheName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, + Attributes.of(LABEL_CACHE_NAME, cacheName), + () -> asyncCacheOverWriteCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static CounterMetric buildAsyncCacheRemoveNotCompleteMetric(String cacheName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, + Attributes.of(LABEL_CACHE_NAME, cacheName), + () -> asyncCacheRemoveNotCompletedCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static CounterMetric buildAsyncCacheRemoveCompleteMetric(String cacheName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, + Attributes.of(LABEL_CACHE_NAME, cacheName), + () -> asyncCacheRemoveCompletedCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static CounterMetric buildAsyncCacheItemCompleteExceptionallyMetric(String cacheName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, + Attributes.of(LABEL_CACHE_NAME, cacheName), + () -> asyncCacheItemCompleteExceptionallyCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + public static CounterMetric buildCompactionReadSizeMetric() { synchronized (BASE_ATTRIBUTES_LISTENERS) { CounterMetric metric = new CounterMetric(metricsConfig, () -> compactionReadSizeInTotal); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/AsyncLRUCacheStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/AsyncLRUCacheStats.java new file mode 100644 index 0000000000..d5e2277394 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/AsyncLRUCacheStats.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.metrics.stats; + +import com.automq.stream.s3.metrics.MetricsLevel; +import com.automq.stream.s3.metrics.S3StreamMetricsManager; +import com.automq.stream.s3.metrics.wrapper.CounterMetric; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class AsyncLRUCacheStats { + private volatile static AsyncLRUCacheStats instance = null; + + private final Map evictCount = new ConcurrentHashMap<>(); + private final Map hitCount = new ConcurrentHashMap<>(); + private final Map missCount = new ConcurrentHashMap<>(); + private final Map putCount = new ConcurrentHashMap<>(); + private final Map popCount = new ConcurrentHashMap<>(); + private final Map overwriteCount = new ConcurrentHashMap<>(); + private final Map removeNotCompleted = new ConcurrentHashMap<>(); + private final Map removeCompleted = new ConcurrentHashMap<>(); + private final Map itemCompleteExceptionally = new ConcurrentHashMap<>(); + + private AsyncLRUCacheStats() { + } + + public static AsyncLRUCacheStats getInstance() { + if (instance == null) { + synchronized (NetworkStats.class) { + if (instance == null) { + instance = new AsyncLRUCacheStats(); + } + } + } + return instance; + } + + + public void markEvict(String cacheName) { + evictCount.computeIfAbsent(cacheName, S3StreamMetricsManager::buildAsyncCacheEvictMetric) + .add(MetricsLevel.DEBUG, 1); + } + + public void markHit(String cacheName) { + hitCount.computeIfAbsent(cacheName, S3StreamMetricsManager::buildAsyncCacheHitMetric) + .add(MetricsLevel.DEBUG, 1); + } + + public void markMiss(String cacheName) { + missCount.computeIfAbsent(cacheName, S3StreamMetricsManager::buildAsyncCacheMissMetric) + .add(MetricsLevel.DEBUG, 1); + } + + public void markPut(String cacheName) { + putCount.computeIfAbsent(cacheName, S3StreamMetricsManager::buildAsyncCachePutMetric) + .add(MetricsLevel.DEBUG, 1); + } + + public void markPop(String cacheName) { + popCount.computeIfAbsent(cacheName, S3StreamMetricsManager::buildAsyncCachePopMetric) + .add(MetricsLevel.DEBUG, 1); + } + + public void markOverWrite(String cacheName) { + overwriteCount.computeIfAbsent(cacheName, S3StreamMetricsManager::buildAsyncCacheOverwriteMetric) + .add(MetricsLevel.DEBUG, 1); + } + + public void markRemoveNotCompleted(String cacheName) { + removeNotCompleted.computeIfAbsent(cacheName, S3StreamMetricsManager::buildAsyncCacheRemoveNotCompleteMetric) + .add(MetricsLevel.DEBUG, 1); + } + + public void markRemoveCompleted(String cacheName) { + removeCompleted.computeIfAbsent(cacheName, S3StreamMetricsManager::buildAsyncCacheRemoveCompleteMetric) + .add(MetricsLevel.DEBUG, 1); + } + + + public void markItemCompleteExceptionally(String cacheName) { + itemCompleteExceptionally.computeIfAbsent(cacheName, S3StreamMetricsManager::buildAsyncCacheItemCompleteExceptionallyMetric) + .add(MetricsLevel.DEBUG, 1); + } + +} diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/AsyncLRUCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/AsyncLRUCacheTest.java index 12f1610a50..ebb5a1c91c 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/AsyncLRUCacheTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/AsyncLRUCacheTest.java @@ -26,7 +26,7 @@ public class AsyncLRUCacheTest { @Test public void test_evictUncompleted() throws Exception { - AsyncLRUCache cache = new AsyncLRUCache<>(10); + AsyncLRUCache cache = new AsyncLRUCache<>("test_evictUncompleted", 10); MockValue v1 = spy(new MockValue()); cache.put("v1", v1); assertEquals(0, cache.completedSet.size()); @@ -59,7 +59,7 @@ public void test_evictUncompleted() throws Exception { @Test public void testPut_repeat() throws Exception { - AsyncLRUCache cache = new AsyncLRUCache<>(10); + AsyncLRUCache cache = new AsyncLRUCache<>("testPut_repeat", 10); MockValue v1 = spy(new MockValue()); cache.put("v1", v1); v1.cf.complete(10); @@ -75,7 +75,7 @@ public void testPut_repeat() throws Exception { @Test public void testRemove() throws Exception { - AsyncLRUCache cache = new AsyncLRUCache<>(10); + AsyncLRUCache cache = new AsyncLRUCache<>("testRemove", 10); MockValue v1 = spy(new MockValue()); cache.put("v1", v1); v1.cf.complete(10); @@ -108,7 +108,7 @@ public void testRemove() throws Exception { @Test public void test_asyncFail() { - AsyncLRUCache cache = new AsyncLRUCache<>(10); + AsyncLRUCache cache = new AsyncLRUCache<>("test_asyncFail", 10); MockValue v1 = new MockValue(); cache.put("v1", v1); Assertions.assertEquals(1, cache.size()); diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/ObjectReaderLRUCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/ObjectReaderLRUCacheTest.java index 1d3ef6c430..49dd3023c3 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/ObjectReaderLRUCacheTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/ObjectReaderLRUCacheTest.java @@ -63,7 +63,7 @@ public void testGetPut() throws ExecutionException, InterruptedException { Assertions.assertEquals(72000, objectReader2.basicObjectInfo().get().size()); Assertions.assertEquals(108000, objectReader3.basicObjectInfo().get().size()); - ObjectReaderLRUCache cache = new ObjectReaderLRUCache(100000); + ObjectReaderLRUCache cache = new ObjectReaderLRUCache("", 100000); cache.put(235L, objectReader3); cache.put(234L, objectReader2); cache.put(233L, objectReader); @@ -75,7 +75,7 @@ public void testGetPut() throws ExecutionException, InterruptedException { @Test public void testConcurrentGetPut() throws InterruptedException { - ObjectReaderLRUCache cache = new ObjectReaderLRUCache(5000); + ObjectReaderLRUCache cache = new ObjectReaderLRUCache("", 5000); List> cfs = new ArrayList<>(); Random r = new Random(); for (int i = 0; i < 100; i++) {