Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): add more metrics #845

Merged
merged 3 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
if (!fromBackoff) {
backoffRecords.offer(request);
}
S3StreamMetricsManager.recordOperationNum(1, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
S3StreamMetricsManager.recordOperationLatency(0L, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -529,8 +529,14 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) {
}
rate = maxDataWriteRate;
}
context.task = DeltaWALUploadTask.builder().config(config).streamRecordsMap(context.cache.records())
.objectManager(objectManager).s3Operator(s3Operator).executor(uploadWALExecutor).rate(rate).build();
context.task = DeltaWALUploadTask.builder()
.config(config)
.streamRecordsMap(context.cache.records())
.objectManager(objectManager)
.s3Operator(s3Operator)
.executor(uploadWALExecutor)
.rate(rate)
.build();
boolean walObjectPrepareQueueEmpty = walPrepareQueue.isEmpty();
walPrepareQueue.add(context);
if (!walObjectPrepareQueueEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.cache.DefaultS3BlockCache.ReadAheadRecord;
import com.automq.stream.utils.biniarysearch.StreamRecordBatchList;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class BlockCache implements DirectByteBufAlloc.OOMHandler {
public BlockCache(long maxSize) {
this.maxSize = maxSize;
DirectByteBufAlloc.registerOOMHandlers(this);
S3StreamMetricsManager.registerBlockCacheSizeSupplier(size::get);
}

public void registerListener(CacheEvictListener listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo

long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT;
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE_HIT, isCacheHit);
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE, isCacheHit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {} ",
ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, totalReturnedSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public InflightReadThrottle(int maxInflightReadBytes) {
this.maxInflightReadBytes = maxInflightReadBytes;
this.remainingInflightReadBytes = maxInflightReadBytes;
executorService.execute(this);
S3StreamMetricsManager.registerInflightReadSizeLimiterGauge(this::getRemainingInflightReadBytes);
S3StreamMetricsManager.registerInflightReadSizeLimiterSupplier(this::getRemainingInflightReadBytes);
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo
this.activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount);
this.blocks.add(activeBlock);
this.blockFreeListener = blockFreeListener;
S3StreamMetricsManager.registerDeltaWalCacheSizeSupplier(size::get);
}

public LogCache(long capacity, long cacheBlockMaxSize) {
Expand Down Expand Up @@ -136,7 +137,7 @@ public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffs

long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = !records.isEmpty() && records.get(0).getBaseOffset() <= startOffset;
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE_HIT, isCacheHit);
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE, isCacheHit);
return records;
}

Expand Down
22 changes: 0 additions & 22 deletions s3stream/src/main/java/com/automq/stream/s3/metrics/Gauge.java

This file was deleted.

25 changes: 0 additions & 25 deletions s3stream/src/main/java/com/automq/stream/s3/metrics/NoopGauge.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,56 @@

import io.opentelemetry.api.common.AttributeKey;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class S3StreamMetricsConstant {
// value = 16KB * 2^i
public static final String[] OBJECT_SIZE_BUCKET_NAMES = {"16KB",
"32KB",
"64KB",
"128KB",
"256KB",
"512KB",
"1MB",
"2MB",
"4MB",
"8MB",
"16MB",
"32MB",
"64MB",
"128MB",
"inf"};
public static final List<Long> OPERATION_LATENCY_BOUNDARIES = List.of(
TimeUnit.MICROSECONDS.toNanos(1),
TimeUnit.MICROSECONDS.toNanos(10),
TimeUnit.MICROSECONDS.toNanos(100),
TimeUnit.MILLISECONDS.toNanos(1),
TimeUnit.MILLISECONDS.toNanos(3),
TimeUnit.MILLISECONDS.toNanos(5),
TimeUnit.MILLISECONDS.toNanos(7),
TimeUnit.MILLISECONDS.toNanos(10),
TimeUnit.MILLISECONDS.toNanos(20),
TimeUnit.MILLISECONDS.toNanos(30),
TimeUnit.MILLISECONDS.toNanos(40),
TimeUnit.MILLISECONDS.toNanos(50),
TimeUnit.MILLISECONDS.toNanos(60),
TimeUnit.MILLISECONDS.toNanos(70),
TimeUnit.MILLISECONDS.toNanos(80),
TimeUnit.MILLISECONDS.toNanos(90),
TimeUnit.MILLISECONDS.toNanos(100),
TimeUnit.MILLISECONDS.toNanos(200),
TimeUnit.MILLISECONDS.toNanos(500),
TimeUnit.SECONDS.toNanos(1),
TimeUnit.SECONDS.toNanos(3),
TimeUnit.SECONDS.toNanos(5),
TimeUnit.SECONDS.toNanos(10),
TimeUnit.SECONDS.toNanos(30),
TimeUnit.MINUTES.toNanos(1),
TimeUnit.MINUTES.toNanos(3),
TimeUnit.MINUTES.toNanos(5)
);

public static final String UPLOAD_SIZE_METRIC_NAME = "upload_size_total";
public static final String DOWNLOAD_SIZE_METRIC_NAME = "download_size_total";
public static final String OPERATION_COUNT_METRIC_NAME = "operation_count_total";
Expand All @@ -36,11 +85,18 @@ public class S3StreamMetricsConstant {
public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME = "network_outbound_limiter_queue_size";
public static final String ALLOCATE_BYTE_BUF_SIZE_METRIC_NAME = "allocate_byte_buf_size";
public static final String READ_AHEAD_SIZE_METRIC_NAME = "read_ahead_size";
public static final String WAL_START_OFFSET = "wal_start_offset";
public static final String WAL_TRIMMED_OFFSET = "wal_trimmed_offset";
public static final String DELTA_WAL_CACHE_SIZE = "delta_wal_cache_size";
public static final String BLOCK_CACHE_SIZE = "block_cache_size";
public static final String AVAILABLE_INFLIGHT_READ_AHEAD_SIZE_METRIC_NAME = "available_inflight_read_ahead_size";
public static final String AVAILABLE_S3_INFLIGHT_READ_QUOTA_METRIC_NAME = "available_s3_inflight_read_quota";
public static final String AVAILABLE_S3_INFLIGHT_WRITE_QUOTA_METRIC_NAME = "available_s3_inflight_write_quota";
public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size_total";
public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size_total";
public static final AttributeKey<String> LABEL_OPERATION_TYPE = AttributeKey.stringKey("operation_type");
public static final AttributeKey<String> LABEL_OPERATION_NAME = AttributeKey.stringKey("operation_name");
public static final AttributeKey<String> LABEL_OBJECT_SIZE_NAME = AttributeKey.stringKey("size");
public static final AttributeKey<String> LABEL_APPEND_WAL_STAGE = AttributeKey.stringKey("stage");
public static final AttributeKey<String> LABEL_CACHE_STATUS = AttributeKey.stringKey("status");
public static final AttributeKey<String> LABEL_OBJECT_STAGE = AttributeKey.stringKey("stage");
Expand Down
Loading
Loading