Skip to content

Commit

Permalink
feat(s3stream): support record metrics by level for s3stream (#870)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Jan 2, 2024
1 parent f74d82f commit c5670e2
Show file tree
Hide file tree
Showing 20 changed files with 251 additions and 120 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.11.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.12.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.11.0-SNAPSHOT</version>
<version>0.12.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.stream.s3;

import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -45,7 +46,7 @@ public static ByteBuf byteBuffer(int initCapacity) {
public static ByteBuf byteBuffer(int initCapacity, String name) {
try {
if (name != null) {
S3StreamMetricsManager.recordAllocateByteBufSize(initCapacity, name);
S3StreamMetricsManager.recordAllocateByteBufSize(MetricsLevel.DEBUG, initCapacity, name);
}
return ALLOC.directBuffer(initCapacity);
} catch (OutOfMemoryError e) {
Expand Down
22 changes: 12 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
Expand Down Expand Up @@ -274,7 +275,7 @@ public CompletableFuture<Void> append(AppendContext context, StreamRecordBatch s
append0(context, writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE);
});
return cf;
}
Expand All @@ -294,7 +295,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f
if (!fromBackoff) {
backoffRecords.offer(request);
}
S3StreamMetricsManager.recordOperationLatency(0L, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, 0L, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -370,7 +371,7 @@ public CompletableFuture<ReadDataBlock> read(FetchContext context,
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
FutureUtil.propagate(read0(context, streamId, startOffset, endOffset, maxBytes), cf);
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE));
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE));
return cf;
}

Expand Down Expand Up @@ -448,7 +449,7 @@ public CompletableFuture<Void> forceUpload(long streamId) {
CompletableFuture<Void> cf = new CompletableFuture<>();
// Wait for a while to group force upload tasks.
forceUploadTicker.tick().whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT);
S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT);
uploadDeltaWAL(streamId, true);
// Wait for all tasks contains streamId complete.
List<CompletableFuture<Void>> tasksContainsStream = this.inflightWALUploadTasks.stream()
Expand All @@ -460,7 +461,7 @@ public CompletableFuture<Void> forceUpload(long streamId) {
callbackSequencer.tryFree(streamId);
}
});
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_COMPLETE));
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_COMPLETE));
return cf;
}

Expand Down Expand Up @@ -492,7 +493,7 @@ private void handleAppendCallback0(WalWriteRequest request) {
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_APPEND_CALLBACK);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_APPEND_CALLBACK);
}

private Lock getStreamCallbackLock(long streamId) {
Expand Down Expand Up @@ -537,7 +538,7 @@ CompletableFuture<Void> uploadDeltaWAL(DeltaWALUploadTaskContext context) {
inflightWALUploadTasks.add(context);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL"));
cf.whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE);
S3StreamMetricsManager.recordStageLatency(MetricsLevel.INFO, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE);
inflightWALUploadTasks.remove(context);
if (ex != null) {
LOGGER.error("upload delta WAL fail", ex);
Expand Down Expand Up @@ -579,10 +580,11 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) {

private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.prepare().thenAcceptAsync(nil -> {
S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_PREPARE);
S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_PREPARE);
// 1. poll out current task and trigger upload.
DeltaWALUploadTaskContext peek = walPrepareQueue.poll();
Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_UPLOAD));
Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> S3StreamMetricsManager.recordStageLatency(
MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_UPLOAD));
// 2. add task to commit queue.
boolean walObjectCommitQueueEmpty = walCommitQueue.isEmpty();
walCommitQueue.add(peek);
Expand All @@ -599,7 +601,7 @@ private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {

private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.commit().thenAcceptAsync(nil -> {
S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMMIT);
S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMMIT);
// 1. poll out current task
walCommitQueue.poll();
if (context.cache.confirmOffset() != 0) {
Expand Down
11 changes: 6 additions & 5 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
Expand Down Expand Up @@ -146,7 +147,7 @@ public long nextOffset() {
public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
writeLock.lock();
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK);
try {
CompletableFuture<AppendResult> cf = exec(() -> {
if (networkInboundLimiter != null) {
Expand All @@ -156,7 +157,7 @@ public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM);
pendingAppends.remove(cf);
});
return cf;
Expand Down Expand Up @@ -198,12 +199,12 @@ public CompletableFuture<FetchResult> fetch(FetchContext context,
@SpanAttribute int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
readLock.lock();
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK);
try {
CompletableFuture<FetchResult> cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM);
if (ex != null) {
Throwable cause = FutureUtil.cause(ex);
if (!(cause instanceof FastReadFailFastException)) {
Expand Down Expand Up @@ -266,7 +267,7 @@ public CompletableFuture<Void> trim(long newStartOffset) {
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.TRIM_STREAM);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.TRIM_STREAM);
});
return cf;
}, LOGGER, "trim");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.automq.stream.api.OpenStreamOptions;
import com.automq.stream.api.Stream;
import com.automq.stream.api.StreamClient;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
Expand Down Expand Up @@ -83,7 +84,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_STREAM);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_STREAM);
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
}
Expand Down Expand Up @@ -138,7 +139,7 @@ private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.OPEN_STREAM);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.OPEN_STREAM);
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.compactedStreamObjectMaxSizeInBytes(config.streamObjectCompactionMaxSizeBytes())
.eligibleStreamObjectLivingTimeInMs(config.streamObjectCompactionLivingTimeMinutes() * 60L * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.stream.s3.cache;

import com.automq.stream.s3.Config;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
Expand Down Expand Up @@ -112,7 +113,7 @@ public CompletableFuture<ReadDataBlock> read(TraceContext traceContext,
long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT;
Span.fromContext(finalTraceContext.currentContext()).setAttribute("cache_hit", isCacheHit);
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE, isCacheHit);
S3StreamMetricsManager.recordReadCacheLatency(MetricsLevel.INFO, timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE, isCacheHit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {}",
ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, totalReturnedSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.stream.s3.cache;

import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
Expand Down Expand Up @@ -99,7 +100,7 @@ public boolean put(StreamRecordBatch recordBatch) {
} finally {
readLock.unlock();
}
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_LOG_CACHE);
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_LOG_CACHE);
return full;
}

Expand Down Expand Up @@ -150,7 +151,7 @@ public List<StreamRecordBatch> get(TraceContext context,

long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = !records.isEmpty() && records.get(0).getBaseOffset() <= startOffset;
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE, isCacheHit);
S3StreamMetricsManager.recordReadCacheLatency(MetricsLevel.INFO, timeElapsed, S3Operation.READ_STORAGE_LOG_CACHE, isCacheHit);
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.stream.s3.cache;

import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.utils.LogContext;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void updateReadAheadResult(long readAheadEndOffset, int readAheadSize) {
lock.lock();
this.readAheadEndOffset = readAheadEndOffset;
this.lastReadAheadSize = readAheadSize;
S3StreamMetricsManager.recordReadAheadSize(readAheadSize);
S3StreamMetricsManager.recordReadAheadSize(MetricsLevel.DEBUG, readAheadSize);
if (logger.isDebugEnabled()) {
logger.debug("update read ahead offset {}, size: {}, lastReadOffset: {}", readAheadEndOffset, readAheadSize, lastReadOffset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
Expand Down Expand Up @@ -136,7 +137,7 @@ public CompletableFuture<List<StreamRecordBatch>> syncReadAhead(TraceContext tra
completeInflightTask0(key, ex);
}
context.taskKeySet.clear();
S3StreamMetricsManager.recordReadAheadLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, true);
S3StreamMetricsManager.recordReadAheadLatency(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, true);
});
}

Expand Down Expand Up @@ -298,7 +299,7 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int
completeInflightTask0(key, ex);
}
context.taskKeySet.clear();
S3StreamMetricsManager.recordReadAheadLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, false);
S3StreamMetricsManager.recordReadAheadLatency(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.BLOCK_CACHE_READ_AHEAD, false);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -188,7 +189,7 @@ private void readContinuousBlocks0(List<StreamDataBlock> streamDataBlocks) {

private CompletableFuture<ByteBuf> rangeRead(long start, long end) {
return rangeRead0(start, end).whenComplete((ret, ex) ->
S3StreamMetricsManager.recordCompactionReadSizeIn(ret.readableBytes()));
S3StreamMetricsManager.recordCompactionReadSizeIn(MetricsLevel.INFO, ret.readableBytes()));
}

private CompletableFuture<ByteBuf> rangeRead0(long start, long end) {
Expand Down
Loading

0 comments on commit c5670e2

Please sign in to comment.