Skip to content

Commit

Permalink
feat(s3stream): implement s3stream metrics with Opentelemetry (#819)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Dec 13, 2023
1 parent d1cdb6b commit 08d1462
Show file tree
Hide file tree
Showing 42 changed files with 455 additions and 776 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.automq.rocketmq.store.MessageStoreImpl;
import com.automq.rocketmq.store.metrics.StoreMetricsManager;
import com.automq.rocketmq.store.metrics.StreamMetricsManager;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.google.common.base.Splitter;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -104,7 +103,6 @@ public MetricsExporter(BrokerConfig brokerConfig, MessageStoreImpl messageStore,
this.streamMetricsManager = new StreamMetricsManager();
this.topicMetricsManager = new TopicMetricsManager(metadataStore, s3MetadataService);
init(resource, tracerProvider);
S3StreamMetricsRegistry.setMetricsGroup(this.streamMetricsManager);
}

public static AttributesBuilder newAttributesBuilder() {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.6.9-SNAPSHOT</s3stream.version>
<s3stream.version>0.6.10-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
8 changes: 7 additions & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.6.9-SNAPSHOT</version>
<version>0.6.10-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand All @@ -34,6 +34,7 @@
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<alapha.opentelemetry.version>1.32.0-alpha</alapha.opentelemetry.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -112,6 +113,11 @@
<artifactId>jackson-databind</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-runtime-telemetry-java17</artifactId>
<version>${alapha.opentelemetry.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.stream.s3;

import com.automq.stream.s3.metrics.stats.ByteBufMetricsStats;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
Expand Down Expand Up @@ -45,11 +45,11 @@ public static ByteBuf byteBuffer(int initCapacity) {
public static ByteBuf byteBuffer(int initCapacity, String name) {
try {
if (name != null) {
ByteBufMetricsStats.getHistogram(name).update(initCapacity);
S3StreamMetricsManager.recordAllocateByteBufSize(initCapacity, name);
}
return ALLOC.directBuffer(initCapacity);
} catch (OutOfMemoryError e) {
for (;;) {
for (; ; ) {
int freedBytes = 0;
for (OOMHandler handler : OOM_HANDLERS) {
freedBytes += handler.handle(initCapacity);
Expand Down Expand Up @@ -77,6 +77,7 @@ public static void registerOOMHandlers(OOMHandler handler) {
public interface OOMHandler {
/**
* Try handle OOM exception.
*
* @param memoryRequired the memory required
* @return freed memory.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package com.automq.stream.s3;

import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectStreamRange;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.operator.Writer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import com.automq.stream.s3.metadata.ObjectUtils;

import java.util.ArrayList;
import java.util.Collections;
Expand Down
17 changes: 9 additions & 8 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -259,7 +259,7 @@ public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
append0(writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE);
});
return cf;
}
Expand All @@ -279,7 +279,7 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
if (!fromBackoff) {
backoffRecords.offer(request);
}
OperationMetricsStats.getCounter(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).inc();
S3StreamMetricsManager.recordOperationNum(1, S3Operation.APPEND_STORAGE_LOG_CACHE_FULL);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -344,7 +344,7 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, readOptions), cf);
cf.whenComplete((nil, ex) -> OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE));
return cf;
}

Expand All @@ -363,6 +363,7 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
endOffset = logCacheRecords.get(0).getBaseOffset();
}
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES);
long finalEndOffset = endOffset;
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(readDataBlock -> {
List<StreamRecordBatch> rst = new ArrayList<>(readDataBlock.getRecords());
int remainingBytesSize = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum();
Expand All @@ -383,7 +384,7 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
timeout.cancel();
if (ex != null) {
LOGGER.error("read from block cache failed, stream={}, {}-{}, maxBytes: {}",
streamId, startOffset, maxBytes, ex);
streamId, startOffset, finalEndOffset, maxBytes, ex);
logCacheRecords.forEach(StreamRecordBatch::release);
}
});
Expand All @@ -396,7 +397,7 @@ private void continuousCheck(List<StreamRecordBatch> records) {
expectStartOffset = record.getLastOffset();
} else {
throw new IllegalArgumentException(String.format("Continuous check failed, expect offset: %d," +
" actual: %d, records: %s", expectStartOffset, record.getBaseOffset() ,records));
" actual: %d, records: %s", expectStartOffset, record.getBaseOffset(), records));
}
}
}
Expand Down Expand Up @@ -444,7 +445,7 @@ private void handleAppendCallback(WalWriteRequest request) {
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_APPEND_CALLBACK).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
S3StreamMetricsManager.recordOperationLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE_APPEND_CALLBACK);
}

private Lock getStreamCallbackLock(long streamId) {
Expand Down Expand Up @@ -488,7 +489,7 @@ CompletableFuture<Void> uploadDeltaWAL(DeltaWALUploadTaskContext context) {
inflightWALUploadTasks.add(cf);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL"));
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getHistogram(S3Operation.UPLOAD_STORAGE_WAL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.UPLOAD_STORAGE_WAL);
inflightWALUploadTasks.remove(cf);
if (ex != null) {
LOGGER.error("upload delta WAL fail", ex);
Expand Down
12 changes: 6 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import com.automq.stream.api.exceptions.FastReadFailFastException;
import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.streams.StreamManager;
Expand Down Expand Up @@ -141,7 +141,7 @@ public long nextOffset() {
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
writeLock.lock();
OperationMetricsStats.getHistogram(S3Operation.APPEND_STREAM_WRITE_LOCK).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK);
try {
CompletableFuture<AppendResult> cf = exec(() -> {
if (networkInboundLimiter != null) {
Expand All @@ -151,7 +151,7 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getHistogram(S3Operation.APPEND_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM);
pendingAppends.remove(cf);
});
return cf;
Expand Down Expand Up @@ -188,12 +188,12 @@ private CompletableFuture<AppendResult> append0(RecordBatch recordBatch) {
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes, ReadOptions readOptions) {
TimerUtil timerUtil = new TimerUtil();
readLock.lock();
OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM_READ_LOCK).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK);
try {
CompletableFuture<FetchResult> cf = exec(() -> fetch0(startOffset, endOffset, maxBytes, readOptions), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM);
if (ex != null) {
Throwable cause = FutureUtil.cause(ex);
if (!(cause instanceof FastReadFailFastException)) {
Expand Down Expand Up @@ -255,7 +255,7 @@ public CompletableFuture<Void> trim(long newStartOffset) {
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getHistogram(S3Operation.TRIM_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.TRIM_STREAM);
});
return cf;
}, LOGGER, "trim");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.automq.stream.api.OpenStreamOptions;
import com.automq.stream.api.Stream;
import com.automq.stream.api.StreamClient;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -84,7 +84,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
OperationMetricsStats.getHistogram(S3Operation.CREATE_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.CREATE_STREAM);
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
}
Expand Down Expand Up @@ -139,7 +139,7 @@ private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
OperationMetricsStats.getHistogram(S3Operation.OPEN_STREAM).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.OPEN_STREAM);
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.compactedStreamObjectMaxSizeInBytes(config.streamObjectCompactionMaxSizeBytes())
.eligibleStreamObjectLivingTimeInMs(config.streamObjectCompactionLivingTimeMinutes() * 60L * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package com.automq.stream.s3.cache;

import com.automq.stream.s3.Config;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
Expand Down Expand Up @@ -86,7 +86,7 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
FutureUtil.exec(() -> {
read0(streamId, startOffset, endOffset, maxBytes, agent, uuid).whenComplete((ret, ex) -> {
if (ex != null) {
LOGGER.error("read {} [{}, {}) from block cache fail", streamId, startOffset, endOffset, ex);
LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail", streamId, startOffset, endOffset, maxBytes, ex);
readCf.completeExceptionally(ex);
this.inflightReadThrottle.release(uuid);
return;
Expand All @@ -95,16 +95,13 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
this.readAheadManager.updateReadResult(streamId, startOffset,
ret.getRecords().get(ret.getRecords().size() - 1).getLastOffset(), totalReturnedSize);

if (ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT) {
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE).inc();
} else {
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE_MISS).inc();
}
long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT;
S3StreamMetricsManager.recordReadCacheLatency(timeElapsed, S3Operation.READ_STORAGE_BLOCK_CACHE_HIT, isCacheHit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {} ",
ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, totalReturnedSize);
}
OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_BLOCK_CACHE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
readCf.complete(ret);
this.inflightReadThrottle.release(uuid);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.automq.stream.s3.cache;

import com.automq.stream.s3.metrics.stats.BlockCacheMetricsStats;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import com.automq.stream.utils.Utils;
Expand Down Expand Up @@ -56,7 +56,7 @@ public InflightReadThrottle(int maxInflightReadBytes) {
this.maxInflightReadBytes = maxInflightReadBytes;
this.remainingInflightReadBytes = maxInflightReadBytes;
executorService.execute(this);
BlockCacheMetricsStats.registerAvailableInflightReadSize(this::getRemainingInflightReadBytes);
S3StreamMetricsManager.registerInflightReadSizeLimiterGauge(this::getRemainingInflightReadBytes);
}

public void shutdown() {
Expand Down
Loading

0 comments on commit 08d1462

Please sign in to comment.