Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): add AsyncLRUCache metric #1829

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,49 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.stats.AsyncLRUCacheStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncLRUCache<K, V extends AsyncMeasurable> {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncLRUCache.class);
private final AsyncLRUCacheStats stats = AsyncLRUCacheStats.getInstance();
private final String cacheName;
private final long maxSize;
final AtomicLong totalSize = new AtomicLong(0);
final LRUCache<K, V> cache = new LRUCache<>();
final Set<V> completedSet = new HashSet<>();
final Set<V> removedSet = new HashSet<>();

public AsyncLRUCache(long maxSize) {
super();
public AsyncLRUCache(String cacheName, long maxSize) {
this.cacheName = cacheName;
if (maxSize <= 0) {
throw new IllegalArgumentException("maxSize must be positive");
}
this.maxSize = maxSize;

S3StreamMetricsManager.registerAsyncCacheSizeSupplier(this::totalSize, cacheName);
S3StreamMetricsManager.registerAsyncCacheMaxSizeSupplier(() -> maxSize, cacheName);
S3StreamMetricsManager.registerAsyncCacheItemNumberSupplier(this::size, cacheName);
}

public synchronized void put(K key, V value) {
V oldValue = cache.get(key);
if (oldValue != null && oldValue != value) {
stats.markOverWrite(cacheName);
cache.remove(key);
afterRemoveValue(oldValue);
} else {
stats.markPut(cacheName);
}

cache.put(key, value);
value.size().whenComplete((v, ex) -> {
synchronized (AsyncLRUCache.this) {
if (ex != null) {
stats.markItemCompleteExceptionally(cacheName);
cache.remove(key);
} else if (!removedSet.contains(value)) {
completedSet.add(value);
Expand All @@ -64,12 +78,18 @@ public synchronized void put(K key, V value) {
}

public synchronized V get(K key) {
return cache.get(key);
V val = cache.get(key);
if (val == null) {
stats.markMiss(cacheName);
} else {
stats.markHit(cacheName);
}
return val;
}


public synchronized boolean remove(K key) {
V value = cache.get(key);
V value = cache.get(key);
if (value == null) {
return false;
}
Expand All @@ -82,9 +102,11 @@ private synchronized void afterRemoveValue(V value) {
try {
boolean completed = completedSet.remove(value);
if (completed) {
stats.markRemoveCompleted(cacheName);
totalSize.addAndGet(-value.size().get());
value.close();
} else {
stats.markRemoveNotCompleted(cacheName);
removedSet.add(value);
}
} catch (Throwable e) {
Expand All @@ -97,6 +119,9 @@ public synchronized Map.Entry<K, V> pop() {
if (entry != null) {
afterRemoveValue(entry.getValue());
}

stats.markPop(cacheName);

return entry;
}

Expand All @@ -123,5 +148,7 @@ private synchronized void evict() {
afterRemoveValue(value);
});
}

stats.markEvict(cacheName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

public class ObjectReaderLRUCache extends AsyncLRUCache<Long, ObjectReader> {

public ObjectReaderLRUCache(int maxObjectSize) {
super(maxObjectSize);
public ObjectReaderLRUCache(String cacheName, int maxObjectSize) {
super(cacheName, maxObjectSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DefaultObjectReaderFactory implements ObjectReaderFactory {
private final ObjectStorage objectStorage;

public DefaultObjectReaderFactory(ObjectStorage objectStorage) {
this.objectReaders = new ObjectReaderLRUCache(MAX_OBJECT_READER_SIZE);
this.objectReaders = new ObjectReaderLRUCache("ObjectReader", MAX_OBJECT_READER_SIZE);
this.objectStorage = objectStorage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void close() {

static class LRUCache extends AsyncLRUCache<Long, StreamRangeIndexCache> {
public LRUCache(int maxSize) {
super(maxSize);
super("NodeRangeIndex", maxSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ public class S3StreamMetricsConstant {
public static final String INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME = "inflight_wal_upload_tasks_count";
public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size";
public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size";
public static final String ASYNC_CACHE_EVICT_COUNT_METRIC_NAME = "async_cache_evict";
public static final String ASYNC_CACHE_HIT_COUNT_METRIC_NAME = "async_cache_hit";
public static final String ASYNC_CACHE_MISS_COUNT_METRIC_NAME = "async_cache_miss";
public static final String ASYNC_CACHE_PUT_COUNT_METRIC_NAME = "async_cache_put";
public static final String ASYNC_CACHE_POP_COUNT_METRIC_NAME = "async_cache_pop";
public static final String ASYNC_CACHE_OVERWRITE_COUNT_METRIC_NAME = "async_cache_overwrite";
public static final String ASYNC_CACHE_REMOVE_NOT_COMPLETE_COUNT_METRIC_NAME = "async_cache_remove_item_not_complete";
public static final String ASYNC_CACHE_REMOVE_COMPLETE_COUNT_METRIC_NAME = "async_cache_remove_item_complete";
public static final String ASYNC_CACHE_ITEM_COMPLETE_EXCEPTIONALLY_COUNT_METRIC_NAME = "async_cache_item_complete_exceptionally";
public static final String ASYNC_CACHE_ITEM_NUMBER_METRIC_NAME = "async_cache_item_count";
public static final String ASYNC_CACHE_ITEM_SIZE_NAME = "async_cache_item_size";
public static final String ASYNC_CACHE_ITEM_MAX_SIZE_NAME = "async_cache_max_size";
public static final String BUFFER_ALLOCATED_MEMORY_SIZE_METRIC_NAME = "buffer_allocated_memory_size";
public static final String BUFFER_USED_MEMORY_SIZE_METRIC_NAME = "buffer_used_memory_size";
public static final String READ_S3_LIMITER_TIME_METRIC_NAME = "read_s3_limiter_time";
Expand All @@ -113,6 +125,7 @@ public class S3StreamMetricsConstant {
public static final AttributeKey<String> LABEL_STATUS = AttributeKey.stringKey("status");
public static final AttributeKey<String> LABEL_TYPE = AttributeKey.stringKey("type");
public static final AttributeKey<String> LABEL_INDEX = AttributeKey.stringKey("index");
public static final AttributeKey<String> LABEL_CACHE_NAME = AttributeKey.stringKey("cacheName");
public static final String LABEL_STATUS_SUCCESS = "success";
public static final String LABEL_STATUS_FAILED = "failed";
public static final String LABEL_STATUS_HIT = "hit";
Expand Down
Loading
Loading