Skip to content

Commit

Permalink
feat(s3stream): add AsyncLRUCache metric (#1829) (#1837)
Browse files Browse the repository at this point in the history
Co-authored-by: lifepuzzlefun <[email protected]>
  • Loading branch information
SCNieh and lifepuzzlefun authored Aug 16, 2024
1 parent a09399e commit edcc838
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,49 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.stats.AsyncLRUCacheStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncLRUCache<K, V extends AsyncMeasurable> {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncLRUCache.class);
private final AsyncLRUCacheStats stats = AsyncLRUCacheStats.getInstance();
private final String cacheName;
private final long maxSize;
final AtomicLong totalSize = new AtomicLong(0);
final LRUCache<K, V> cache = new LRUCache<>();
final Set<V> completedSet = new HashSet<>();
final Set<V> removedSet = new HashSet<>();

public AsyncLRUCache(long maxSize) {
super();
public AsyncLRUCache(String cacheName, long maxSize) {
this.cacheName = cacheName;
if (maxSize <= 0) {
throw new IllegalArgumentException("maxSize must be positive");
}
this.maxSize = maxSize;

S3StreamMetricsManager.registerAsyncCacheSizeSupplier(this::totalSize, cacheName);
S3StreamMetricsManager.registerAsyncCacheMaxSizeSupplier(() -> maxSize, cacheName);
S3StreamMetricsManager.registerAsyncCacheItemNumberSupplier(this::size, cacheName);
}

public synchronized void put(K key, V value) {
V oldValue = cache.get(key);
if (oldValue != null && oldValue != value) {
stats.markOverWrite(cacheName);
cache.remove(key);
afterRemoveValue(oldValue);
} else {
stats.markPut(cacheName);
}

cache.put(key, value);
value.size().whenComplete((v, ex) -> {
synchronized (AsyncLRUCache.this) {
if (ex != null) {
stats.markItemCompleteExceptionally(cacheName);
cache.remove(key);
} else if (!removedSet.contains(value)) {
completedSet.add(value);
Expand All @@ -64,12 +78,18 @@ public synchronized void put(K key, V value) {
}

public synchronized V get(K key) {
return cache.get(key);
V val = cache.get(key);
if (val == null) {
stats.markMiss(cacheName);
} else {
stats.markHit(cacheName);
}
return val;
}


public synchronized boolean remove(K key) {
V value = cache.get(key);
V value = cache.get(key);
if (value == null) {
return false;
}
Expand All @@ -82,9 +102,11 @@ private synchronized void afterRemoveValue(V value) {
try {
boolean completed = completedSet.remove(value);
if (completed) {
stats.markRemoveCompleted(cacheName);
totalSize.addAndGet(-value.size().get());
value.close();
} else {
stats.markRemoveNotCompleted(cacheName);
removedSet.add(value);
}
} catch (Throwable e) {
Expand All @@ -97,6 +119,9 @@ public synchronized Map.Entry<K, V> pop() {
if (entry != null) {
afterRemoveValue(entry.getValue());
}

stats.markPop(cacheName);

return entry;
}

Expand All @@ -123,5 +148,7 @@ private synchronized void evict() {
afterRemoveValue(value);
});
}

stats.markEvict(cacheName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

public class ObjectReaderLRUCache extends AsyncLRUCache<Long, ObjectReader> {

public ObjectReaderLRUCache(int maxObjectSize) {
super(maxObjectSize);
public ObjectReaderLRUCache(String cacheName, int maxObjectSize) {
super(cacheName, maxObjectSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DefaultObjectReaderFactory implements ObjectReaderFactory {
private final ObjectStorage objectStorage;

public DefaultObjectReaderFactory(ObjectStorage objectStorage) {
this.objectReaders = new ObjectReaderLRUCache(MAX_OBJECT_READER_SIZE);
this.objectReaders = new ObjectReaderLRUCache("ObjectReader", MAX_OBJECT_READER_SIZE);
this.objectStorage = objectStorage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void close() {

static class LRUCache extends AsyncLRUCache<Long, StreamRangeIndexCache> {
public LRUCache(int maxSize) {
super(maxSize);
super("NodeRangeIndex", maxSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ public class S3StreamMetricsConstant {
public static final String INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME = "inflight_wal_upload_tasks_count";
public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size";
public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size";
public static final String ASYNC_CACHE_EVICT_COUNT_METRIC_NAME = "async_cache_evict";
public static final String ASYNC_CACHE_HIT_COUNT_METRIC_NAME = "async_cache_hit";
public static final String ASYNC_CACHE_MISS_COUNT_METRIC_NAME = "async_cache_miss";
public static final String ASYNC_CACHE_PUT_COUNT_METRIC_NAME = "async_cache_put";
public static final String ASYNC_CACHE_POP_COUNT_METRIC_NAME = "async_cache_pop";
public static final String ASYNC_CACHE_OVERWRITE_COUNT_METRIC_NAME = "async_cache_overwrite";
public static final String ASYNC_CACHE_REMOVE_NOT_COMPLETE_COUNT_METRIC_NAME = "async_cache_remove_item_not_complete";
public static final String ASYNC_CACHE_REMOVE_COMPLETE_COUNT_METRIC_NAME = "async_cache_remove_item_complete";
public static final String ASYNC_CACHE_ITEM_COMPLETE_EXCEPTIONALLY_COUNT_METRIC_NAME = "async_cache_item_complete_exceptionally";
public static final String ASYNC_CACHE_ITEM_NUMBER_METRIC_NAME = "async_cache_item_count";
public static final String ASYNC_CACHE_ITEM_SIZE_NAME = "async_cache_item_size";
public static final String ASYNC_CACHE_ITEM_MAX_SIZE_NAME = "async_cache_max_size";
public static final String BUFFER_ALLOCATED_MEMORY_SIZE_METRIC_NAME = "buffer_allocated_memory_size";
public static final String BUFFER_USED_MEMORY_SIZE_METRIC_NAME = "buffer_used_memory_size";
public static final String READ_S3_LIMITER_TIME_METRIC_NAME = "read_s3_limiter_time";
Expand All @@ -113,6 +125,7 @@ public class S3StreamMetricsConstant {
public static final AttributeKey<String> LABEL_STATUS = AttributeKey.stringKey("status");
public static final AttributeKey<String> LABEL_TYPE = AttributeKey.stringKey("type");
public static final AttributeKey<String> LABEL_INDEX = AttributeKey.stringKey("index");
public static final AttributeKey<String> LABEL_CACHE_NAME = AttributeKey.stringKey("cacheName");
public static final String LABEL_STATUS_SUCCESS = "success";
public static final String LABEL_STATUS_FAILED = "failed";
public static final String LABEL_STATUS_HIT = "hit";
Expand Down
Loading

0 comments on commit edcc838

Please sign in to comment.