diff --git a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java index d5252ddaf4..cba6d7bf53 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java @@ -45,6 +45,7 @@ public class ByteBufAlloc { public static final int STREAM_OBJECT_COMPACTION_WRITE = 8; public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9; public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10; + public static final int BLOCK_CACHE = 11; public static ByteBufAllocMetric byteBufAllocMetric = null; /** @@ -68,7 +69,7 @@ public class ByteBufAlloc { registerAllocType(STREAM_OBJECT_COMPACTION_WRITE, "stream_object_compaction_write"); registerAllocType(STREAM_SET_OBJECT_COMPACTION_READ, "stream_set_object_compaction_read"); registerAllocType(STREAM_SET_OBJECT_COMPACTION_WRITE, "stream_set_object_compaction_write"); - + registerAllocType(BLOCK_CACHE, "block_cache"); } /** diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index abe937ebe0..3e8b5a2174 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE; import static com.automq.stream.s3.ByteBufAlloc.READ_INDEX_BLOCK; import static com.automq.stream.s3.ObjectWriter.Footer.FOOTER_SIZE; import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET; @@ -71,7 +72,12 @@ public CompletableFuture find(long streamId, long startOffset, public CompletableFuture read(DataBlockIndex block) { CompletableFuture rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition(), ThrottleStrategy.CATCH_UP); - return rangeReadCf.thenApply(DataBlockGroup::new); + return rangeReadCf.thenApply(buf -> { + ByteBuf pooled = ByteBufAlloc.byteBuffer(buf.readableBytes(), BLOCK_CACHE); + pooled.writeBytes(buf); + buf.release(); + return new DataBlockGroup(pooled); + }); } void asyncGetBasicObjectInfo() { @@ -392,7 +398,7 @@ public static class DataBlockGroup implements AutoCloseable { private final int recordCount; public DataBlockGroup(ByteBuf buf) { - this.buf = buf.duplicate(); + this.buf = buf; this.recordCount = check(buf); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java index cf34d13460..8ef7b4bbf6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java @@ -16,12 +16,14 @@ import com.automq.stream.s3.model.StreamRecordBatch; import com.automq.stream.utils.CloseableIterator; import io.netty.buffer.ByteBuf; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCounted; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; -@EventLoopSafe public class DataBlock { +@EventLoopSafe public class DataBlock extends AbstractReferenceCounted { private static final int UNREAD_INIT = -1; private final long objectId; private final DataBlockIndex dataBlockIndex; @@ -58,9 +60,7 @@ public CompletableFuture dataFuture() { } public void free() { - if (dataBlockGroup != null) { - dataBlockGroup.release(); - } + release(); freeCf.complete(this); } @@ -101,14 +101,19 @@ public void markRead() { } } - public void retain() { - dataBlockGroup.retain(); + @Override + public ReferenceCounted touch(Object hint) { + return null; } - public void release() { - dataBlockGroup.release(); + @Override + protected void deallocate() { + if (dataBlockGroup != null) { + dataBlockGroup.release(); + } } + // Only for test ByteBuf dataBuf() { return dataBlockGroup.buf(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java index 57cdcfecc7..2e53d8a56e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java @@ -14,9 +14,12 @@ import com.automq.stream.s3.DataBlockIndex; import com.automq.stream.s3.ObjectReader; import com.automq.stream.s3.cache.LRUCache; +import com.automq.stream.s3.metrics.MetricsLevel; +import com.automq.stream.s3.metrics.S3StreamMetricsManager; +import com.automq.stream.s3.metrics.stats.StorageOperationStats; +import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.threads.EventLoop; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -45,6 +48,7 @@ public DataBlockCache(long maxSize, EventLoop[] eventLoops) { for (int i = 0; i < eventLoops.length; i++) { caches[i] = new Cache(eventLoops[i]); } + S3StreamMetricsManager.registerBlockCacheSizeSupplier(() -> maxSize - sizeLimiter.permits()); } /** @@ -61,6 +65,10 @@ public CompletableFuture getBlock(ObjectReader objectReader, DataBloc return cache.getBlock(objectReader, dataBlockIndex); } + public long available() { + return sizeLimiter.permits(); + } + @Override public String toString() { return "DataBlockCache{" + @@ -107,7 +115,6 @@ public int hashCode() { class Cache implements ReadStatusChangeListener { final Map blocks = new HashMap<>(); final LRUCache lru = new LRUCache<>(); - final Map inactive = new HashMap<>(); private final EventLoop eventLoop; public Cache(EventLoop eventLoop) { @@ -115,6 +122,10 @@ public Cache(EventLoop eventLoop) { } public CompletableFuture getBlock(ObjectReader objectReader, DataBlockIndex dataBlockIndex) { + return FutureUtil.exec(() -> getBlock0(objectReader, dataBlockIndex), LOGGER, "getBlock"); + } + + private CompletableFuture getBlock0(ObjectReader objectReader, DataBlockIndex dataBlockIndex) { long objectId = objectReader.metadata().objectId(); DataBlockGroupKey key = new DataBlockGroupKey(objectId, dataBlockIndex); DataBlock dataBlock = blocks.get(key); @@ -128,12 +139,18 @@ public CompletableFuture getBlock(ObjectReader objectReader, DataBloc CompletableFuture cf = new CompletableFuture<>(); // if the data is already loaded, the listener will be invoked right now, // else the listener will be invoked immediately after data loaded in the same eventLoop. + if (dataBlock.dataFuture().isDone()) { + StorageOperationStats.getInstance().blockCacheBlockHitThroughput.add(MetricsLevel.INFO, dataBlock.dataBlockIndex().size()); + } else { + StorageOperationStats.getInstance().blockCacheBlockMissThroughput.add(MetricsLevel.INFO, dataBlock.dataBlockIndex().size()); + } + // DataBlock#retain should will before the complete the future to avoid the other read use #markRead to really free the data block. + dataBlock.retain(); dataBlock.dataFuture().whenComplete((db, ex) -> { if (ex != null) { cf.completeExceptionally(ex); return; } - db.retain(); cf.complete(db); }); return cf; @@ -143,11 +160,14 @@ private void read(ObjectReader reader, DataBlock dataBlock, EventLoop eventLoop) reader.retain(); boolean acquired = sizeLimiter.acquire(dataBlock.dataBlockIndex().size(), () -> { reader.read(dataBlock.dataBlockIndex()).whenCompleteAsync((rst, ex) -> { + StorageOperationStats.getInstance().blockCacheReadS3Throughput.add(MetricsLevel.INFO, dataBlock.dataBlockIndex().size()); reader.release(); + DataBlockGroupKey key = new DataBlockGroupKey(dataBlock.objectId(), dataBlock.dataBlockIndex()); if (ex != null) { dataBlock.completeExceptionally(ex); + blocks.remove(key, dataBlock); } else { - lru.put(new DataBlockGroupKey(dataBlock.objectId(), dataBlock.dataBlockIndex()), dataBlock); + lru.put(key, dataBlock); dataBlock.complete(rst); } if (sizeLimiter.requiredRelease()) { @@ -173,18 +193,8 @@ void evict() { private void evict0() { // TODO: avoid awake more tasks than necessary while (sizeLimiter.requiredRelease()) { - Map.Entry entry = null; - if (!inactive.isEmpty()) { - Iterator> it = inactive.entrySet().iterator(); - if (it.hasNext()) { - entry = it.next(); - it.remove(); - lru.remove(entry.getKey()); - } - } - if (entry == null) { - entry = lru.pop(); - } + Map.Entry entry; + entry = lru.pop(); if (entry == null) { break; } @@ -192,6 +202,7 @@ private void evict0() { DataBlock dataBlock = entry.getValue(); if (blocks.remove(key, dataBlock)) { dataBlock.free(); + StorageOperationStats.getInstance().blockCacheBlockEvictThroughput.add(MetricsLevel.INFO, dataBlock.dataBlockIndex().size()); } else { LOGGER.error("[BUG] duplicated free data block {}", dataBlock); } @@ -199,14 +210,13 @@ private void evict0() { } public void markUnread(DataBlock dataBlock) { - DataBlockGroupKey key = new DataBlockGroupKey(dataBlock.objectId(), dataBlock.dataBlockIndex()); - inactive.remove(key, dataBlock); } public void markRead(DataBlock dataBlock) { DataBlockGroupKey key = new DataBlockGroupKey(dataBlock.objectId(), dataBlock.dataBlockIndex()); - if (dataBlock == blocks.get(key)) { - inactive.put(key, dataBlock); + if (blocks.remove(key, dataBlock)) { + lru.remove(key); + dataBlock.free(); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java index 7cdf242c8f..67842641f3 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java @@ -18,6 +18,9 @@ import com.automq.stream.s3.exceptions.BlockNotContinuousException; import com.automq.stream.s3.exceptions.ObjectNotExistException; import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metrics.MetricsLevel; +import com.automq.stream.s3.metrics.TimerUtil; +import com.automq.stream.s3.metrics.stats.StorageOperationStats; import com.automq.stream.s3.model.StreamRecordBatch; import com.automq.stream.s3.objects.ObjectManager; import com.automq.stream.utils.FutureUtil; @@ -49,7 +52,9 @@ public class StreamReader { private static final int DEFAULT_READAHEAD_SIZE = 1024 * 1024 / 2; private static final int MAX_READAHEAD_SIZE = 32 * 1024 * 1024; private static final long READAHEAD_RESET_COLD_DOWN_MILLS = TimeUnit.MINUTES.toMillis(1); - private static final LogSuppressor LOG_SUPPRESSOR = new LogSuppressor(LOGGER, 30000); + private static final long READAHEAD_AVAILABLE_BYTES_THRESHOLD = 32L * 1024 * 1024; + private static final LogSuppressor READAHEAD_RESET_LOG_SUPPRESSOR = new LogSuppressor(LOGGER, 30000); + private static final LogSuppressor BLOCKS_RESET_LOG_SUPPRESSOR = new LogSuppressor(LOGGER, 30000); // visible to test final NavigableMap blocksMap = new TreeMap<>(); Block lastBlock = null; @@ -65,6 +70,8 @@ public class StreamReader { private CompletableFuture> inflightLoadIndexCf; private long lastAccessTimestamp = System.currentTimeMillis(); + private boolean closed = false; + public StreamReader( long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager, @@ -82,7 +89,11 @@ public StreamReader( } public CompletableFuture read(long startOffset, long endOffset, int maxBytes) { - return read(startOffset, endOffset, maxBytes, 1); + try { + return read(startOffset, endOffset, maxBytes, 1); + } catch (Throwable e) { + return FutureUtil.failedFuture(e); + } } CompletableFuture read(long startOffset, long endOffset, int maxBytes, int leftRetries) { @@ -94,17 +105,19 @@ CompletableFuture read(long startOffset, long endOffset, int maxB Throwable cause = FutureUtil.cause(ex); if (cause != null) { readContext.records.forEach(StreamRecordBatch::release); - if (leftRetries > 0) { - if (cause instanceof ObjectNotExistException || cause instanceof NoSuchKeyException || cause instanceof BlockNotContinuousException) { - // The cached blocks maybe invalid after object compaction, so we need to reset the blocks and retry read - resetBlocks(); - FutureUtil.propagate(read(startOffset, endOffset, maxBytes, leftRetries - 1), retCf); - } + if (leftRetries > 0 && (cause instanceof ObjectNotExistException || cause instanceof NoSuchKeyException || cause instanceof BlockNotContinuousException)) { + // The cached blocks maybe invalid after object compaction, so we need to reset the blocks and retry read + resetBlocks(); + FutureUtil.propagate(read(startOffset, endOffset, maxBytes, leftRetries - 1), retCf); } else { + for (Block block : readContext.blocks) { + block.release(); + } retCf.completeExceptionally(cause); } } else { afterRead(rst, readContext); + StorageOperationStats.getInstance().blockCacheReadStreamThroughput.add(MetricsLevel.INFO, rst.sizeInBytes()); retCf.complete(rst); } }, retCf, LOGGER, "read")); @@ -120,6 +133,7 @@ public long lastAccessTimestamp() { } public void close() { + closed = true; blocksMap.forEach((k, v) -> { if (v.data != null) { v.data.markRead(); @@ -182,7 +196,6 @@ void read0(ReadContext ctx, long startOffset, long endOffset, int maxBytes) { read0(ctx, nextStartOffset, endOffset, remainingSize); } }).whenComplete((nil, ex) -> { - blocks.forEach(Block::release); if (ex != null) { ctx.cf.completeExceptionally(ex); } @@ -205,7 +218,12 @@ void afterRead(ReadDataBlock readDataBlock, ReadContext ctx) { } } // #getDataBlock will invoke DataBlock#markUnread - ctx.blocks.forEach(b -> b.data.markRead()); + for (Block block : ctx.blocks) { + block.release(); + if (block.index.endOffset() <= nextReadOffset) { + block.markRead(); + } + } // try readahead to accelerate the next read readahead.tryReadahead(); } @@ -218,7 +236,7 @@ private CompletableFuture> getBlocks(long startOffset, long endOffse context.cf.completeExceptionally(ex); } context.cf.exceptionally(ex -> { - context.blocks.forEach(b -> b.loadCf.thenAccept(nil -> b.release())); + context.blocks.forEach(Block::release); return null; }); return context.cf; @@ -239,6 +257,7 @@ private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset, if (!objectManager.isObjectExist(objectId)) { // The cached block's object maybe deleted by the compaction. So we need to check the object exist. ctx.cf.completeExceptionally(new ObjectNotExistException(objectId)); + return; } DataBlockIndex index = block.index; if (!firstBlock || index.startOffset() == startOffset) { @@ -290,10 +309,12 @@ private CompletableFuture> loadMoreBlocksWithoutData() { long nextLoadingOffset = calWindowBlocksEndOffset(); AtomicLong nextFindStartOffset = new AtomicLong(nextLoadingOffset); Map newDataBlockIndex = new HashMap<>(); + TimerUtil time = new TimerUtil(); // 1. get objects CompletableFuture> getObjectsCf = objectManager.getObjects(streamId, nextLoadingOffset, -1L, GET_OBJECT_STEP); // 2. get block indexes from objects CompletableFuture findBlockIndexesCf = getObjectsCf.thenComposeAsync(objects -> { + StorageOperationStats.getInstance().getIndicesTimeGetObjectStats.record(time.elapsedAndResetAs(TimeUnit.NANOSECONDS)); CompletableFuture prevCf = CompletableFuture.completedFuture(null); for (S3ObjectMetadata objectMetadata : objects) { ObjectReader objectReader = objectReaderFactory.apply(objectMetadata); @@ -325,6 +346,7 @@ private CompletableFuture> loadMoreBlocksWithoutData() { inflightLoadIndexCf.completeExceptionally(ex); return; } + StorageOperationStats.getInstance().getIndicesTimeFindIndexStats.record(time.elapsedAs(TimeUnit.NANOSECONDS)); CompletableFuture> cf = inflightLoadIndexCf; inflightLoadIndexCf = null; cf.complete(newDataBlockIndex); @@ -341,11 +363,14 @@ private long calWindowBlocksEndOffset() { } private void handleBlockFree(Block block) { + if (closed) { + return; + } Block blockInMap = blocksMap.get(block.index.startOffset()); if (block == blockInMap) { // The unread block is evicted; It means the cache is full, we need to reset the readahead. readahead.reset(); - LOG_SUPPRESSOR.warn("The unread block is evicted, please increase the block cache size"); + READAHEAD_RESET_LOG_SUPPRESSOR.warn("The unread block is evicted, please increase the block cache size"); } } @@ -353,10 +378,12 @@ private void resetBlocks() { blocksMap.clear(); lastBlock = null; loadedBlockIndexEndOffset = 0L; + BLOCKS_RESET_LOG_SUPPRESSOR.info("The stream reader's blocks are reset, cause of the object compaction"); } /** * Put block into the blocks + * * @param block {@link Block} * @return if the block is continuous to the last block, it will return true */ @@ -371,15 +398,15 @@ private boolean putBlock(Block block) { } static class GetBlocksContext { - List blocks = new ArrayList<>(); - CompletableFuture> cf = new CompletableFuture<>(); + final List blocks = new ArrayList<>(); + final CompletableFuture> cf = new CompletableFuture<>(); } static class ReadContext { - List records = new LinkedList<>(); - List blocks = new ArrayList<>(); + final List records = new LinkedList<>(); + final List blocks = new ArrayList<>(); + final CompletableFuture cf = new CompletableFuture<>(); CacheAccessType accessType = CacheAccessType.BLOCK_CACHE_HIT; - CompletableFuture cf = new CompletableFuture<>(); } class Block { @@ -388,6 +415,7 @@ class Block { DataBlock data; CompletableFuture loadCf; Throwable exception; + boolean released = false; public Block(S3ObjectMetadata metadata, DataBlockIndex index) { this.metadata = metadata; @@ -417,8 +445,21 @@ public Block newBlockWithData() { } public void release() { + if (released) { + LOGGER.error("[BUG] duplicated release", new IllegalStateException()); + return; + } + released = true; + loadCf.whenComplete((nil, ex) -> { + if (data != null) { + data.release(); + } + }); + } + + public void markRead() { if (data != null) { - data.release(); + data.markRead(); } } } @@ -453,12 +494,13 @@ public void tryReadahead() { // if the user read doesn't reach the readahead mark, we don't need to readahead return; } + if (dataBlockCache.available() < nextReadaheadSize + READAHEAD_AVAILABLE_BYTES_THRESHOLD) { + return; + } readaheadMarkOffset = nextReadaheadOffset; inflightReadaheadCf = getBlocks(nextReadaheadOffset, -1L, nextReadaheadSize).thenAccept(blocks -> { nextReadaheadOffset = blocks.isEmpty() ? nextReadaheadOffset : blocks.get(blocks.size() - 1).index.endOffset(); - for (Block block : blocks) { - block.loadCf.whenComplete((nil, ex) -> block.release()); - } + blocks.forEach(Block::release); }); // For get block indexes and load data block are sync success, // the whenComplete will invoke first before assign CompletableFuture to inflightReadaheadCf diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java index 400c6c35f5..1a453de169 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java @@ -35,7 +35,7 @@ public class StreamReaders implements S3BlockCache { private static final Logger LOGGER = LoggerFactory.getLogger(StreamReaders.class); private static final int MAX_OBJECT_READER_SIZE = 100 * 1024 * 1024; // 100MB; - private static final long STREAM_READER_EXPIRED_MILLS = TimeUnit.MINUTES.toMillis(3); + private static final long STREAM_READER_EXPIRED_MILLS = TimeUnit.MINUTES.toMillis(1); private static final long STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS = TimeUnit.MINUTES.toMillis(1); private final Cache[] caches; private final DataBlockCache dataBlockCache; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index 490fc949f7..78cd42ee27 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -104,6 +104,7 @@ public class S3StreamMetricsConstant { public static final String WRITE_S3_LIMITER_TIME_METRIC_NAME = "write_s3_limiter_time"; public static final String GET_INDEX_TIME_METRIC_NAME = "get_index_time"; public static final String READ_BLOCK_CACHE_METRIC_NAME = "read_block_cache_stage_time"; + public static final String READ_BLOCK_CACHE_THROUGHPUT_METRIC_NAME = "block_cache_ops_throughput"; public static final AttributeKey LABEL_OPERATION_TYPE = AttributeKey.stringKey("operation_type"); public static final AttributeKey LABEL_OPERATION_NAME = AttributeKey.stringKey("operation_name"); public static final AttributeKey LABEL_SIZE_NAME = AttributeKey.stringKey("size"); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index e9cd2c4567..514bb58ad5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -23,6 +23,7 @@ import com.automq.stream.s3.network.ThrottleStrategy; import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricsRegistry; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; @@ -89,6 +90,8 @@ public class S3StreamMetricsManager { private static Supplier deltaWalTrimmedOffsetSupplier = () -> 0L; private static Supplier deltaWALCacheSizeSupplier = () -> 0L; private static Supplier blockCacheSizeSupplier = () -> 0L; + private static LongCounter blockCacheOpsThroughput = new NoopLongCounter(); + private static Map> availableInflightS3ReadQuotaSupplier = new ConcurrentHashMap<>(); private static Map> availableInflightS3WriteQuotaSupplier = new ConcurrentHashMap<>(); private static Supplier inflightWALUploadTasksCountSupplier = () -> 0; @@ -214,7 +217,7 @@ public static void initMetrics(Meter meter, String prefix) { .setUnit("bytes") .ofLongs() .buildWithCallback(result -> { - if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel())) { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { result.record(deltaWALCacheSizeSupplier.get(), metricsConfig.getBaseAttributes()); } }); @@ -223,7 +226,7 @@ public static void initMetrics(Meter meter, String prefix) { .setUnit("bytes") .ofLongs() .buildWithCallback(result -> { - if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel())) { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { result.record(blockCacheSizeSupplier.get(), metricsConfig.getBaseAttributes()); } }); @@ -313,6 +316,10 @@ public static void initMetrics(Meter meter, String prefix) { result.record(maxPendingStreamFetchLatency(), metricsConfig.getBaseAttributes()); } }); + blockCacheOpsThroughput = meter.counterBuilder(prefix + S3StreamMetricsConstant.READ_BLOCK_CACHE_THROUGHPUT_METRIC_NAME) + .setDescription("Block cache operation throughput") + .setUnit("bytes") + .build(); } public static void registerNetworkLimiterSupplier(AsyncNetworkBandwidthLimiter.Type type, @@ -516,6 +523,16 @@ public static YammerHistogramMetric buildReadBlockCacheStageTime(MetricName metr } } + public static CounterMetric buildBlockCacheOpsThroughputMetric(String ops) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, Attributes.builder() + .put(AttributeKey.stringKey("ops"), ops) + .build(), blockCacheOpsThroughput); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + public static YammerHistogramMetric buildReadS3LimiterTimeMetric(MetricName metricName, MetricsLevel metricsLevel, int index) { synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java index d1ade0b805..1bed71913b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/StorageOperationStats.java @@ -16,12 +16,16 @@ import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.operations.S3Operation; import com.automq.stream.s3.metrics.operations.S3Stage; +import com.automq.stream.s3.metrics.wrapper.CounterMetric; import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric; import com.yammer.metrics.core.MetricName; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static com.automq.stream.s3.metrics.S3StreamMetricsConstant.GET_INDEX_TIME_METRIC_NAME; +import static com.automq.stream.s3.metrics.S3StreamMetricsConstant.READ_BLOCK_CACHE_METRIC_NAME; + public class StorageOperationStats { private volatile static StorageOperationStats instance = null; @@ -93,14 +97,15 @@ public class StorageOperationStats { MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STAGE_PUT_BLOCK_CACHE); public final YammerHistogramMetric getIndicesTimeGetObjectStats = S3StreamMetricsManager.buildGetIndexTimeMetric( - new MetricName(StorageOperationStats.class, "GetIndexTime-" + S3StreamMetricsConstant.LABEL_STAGE_GET_OBJECTS), + new MetricName(StorageOperationStats.class, GET_INDEX_TIME_METRIC_NAME + S3StreamMetricsConstant.LABEL_STAGE_GET_OBJECTS), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STAGE_GET_OBJECTS); public final YammerHistogramMetric getIndicesTimeFindIndexStats = S3StreamMetricsManager.buildGetIndexTimeMetric( - new MetricName(StorageOperationStats.class, "GetIndexTime-" + S3StreamMetricsConstant.LABEL_STAGE_FIND_INDEX), + new MetricName(StorageOperationStats.class, GET_INDEX_TIME_METRIC_NAME + S3StreamMetricsConstant.LABEL_STAGE_FIND_INDEX), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STAGE_FIND_INDEX); public final YammerHistogramMetric getIndicesTimeComputeStats = S3StreamMetricsManager.buildGetIndexTimeMetric( - new MetricName(StorageOperationStats.class, "GetIndexTime-" + S3StreamMetricsConstant.LABEL_STAGE_COMPUTE), + new MetricName(StorageOperationStats.class, GET_INDEX_TIME_METRIC_NAME + S3StreamMetricsConstant.LABEL_STAGE_COMPUTE), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STAGE_COMPUTE); + private final Map readS3LimiterStatsMap = new ConcurrentHashMap<>(); private final Map writeS3LimiterStatsMap = new ConcurrentHashMap<>(); public final YammerHistogramMetric readAheadSyncSizeStats = S3StreamMetricsManager.buildReadAheadSizeMetric( @@ -111,38 +116,44 @@ public class StorageOperationStats { MetricsLevel.INFO, S3StreamMetricsConstant.LABEL_STATUS_ASYNC); public final YammerHistogramMetric readBlockCacheStageMissWaitInflightTimeStats = S3StreamMetricsManager.buildReadBlockCacheStageTime( - new MetricName(StorageOperationStats.class, "ReadBlockStageTime" + S3StreamMetricsConstant.LABEL_STATUS_MISS + new MetricName(StorageOperationStats.class, READ_BLOCK_CACHE_METRIC_NAME + S3StreamMetricsConstant.LABEL_STATUS_MISS + S3StreamMetricsConstant.LABEL_STAGE_WAIT_INFLIGHT), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STATUS_MISS, S3StreamMetricsConstant.LABEL_STAGE_WAIT_INFLIGHT); public final YammerHistogramMetric readBlockCacheStageMissReadCacheTimeStats = S3StreamMetricsManager.buildReadBlockCacheStageTime( - new MetricName(StorageOperationStats.class, "ReadBlockStageTime" + S3StreamMetricsConstant.LABEL_STATUS_MISS + new MetricName(StorageOperationStats.class, READ_BLOCK_CACHE_METRIC_NAME + S3StreamMetricsConstant.LABEL_STATUS_MISS + S3StreamMetricsConstant.LABEL_STAGE_READ_CACHE), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STATUS_MISS, S3StreamMetricsConstant.LABEL_STAGE_READ_CACHE); public final YammerHistogramMetric readBlockCacheStageMissReadAheadTimeStats = S3StreamMetricsManager.buildReadBlockCacheStageTime( - new MetricName(StorageOperationStats.class, "ReadBlockStageTime" + S3StreamMetricsConstant.LABEL_STATUS_MISS + new MetricName(StorageOperationStats.class, READ_BLOCK_CACHE_METRIC_NAME + S3StreamMetricsConstant.LABEL_STATUS_MISS + S3StreamMetricsConstant.LABEL_STAGE_READ_AHEAD), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STATUS_MISS, S3StreamMetricsConstant.LABEL_STAGE_READ_AHEAD); public final YammerHistogramMetric readBlockCacheStageMissReadS3TimeStats = S3StreamMetricsManager.buildReadBlockCacheStageTime( - new MetricName(StorageOperationStats.class, "ReadBlockStageTime" + S3StreamMetricsConstant.LABEL_STATUS_MISS + new MetricName(StorageOperationStats.class, READ_BLOCK_CACHE_METRIC_NAME + S3StreamMetricsConstant.LABEL_STATUS_MISS + S3StreamMetricsConstant.LABEL_STAGE_READ_S3), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STATUS_MISS, S3StreamMetricsConstant.LABEL_STAGE_READ_S3); public final YammerHistogramMetric readBlockCacheStageHitWaitInflightTimeStats = S3StreamMetricsManager.buildReadBlockCacheStageTime( - new MetricName(StorageOperationStats.class, "ReadBlockStageTime" + S3StreamMetricsConstant.LABEL_STATUS_HIT + new MetricName(StorageOperationStats.class, READ_BLOCK_CACHE_METRIC_NAME + S3StreamMetricsConstant.LABEL_STATUS_HIT + S3StreamMetricsConstant.LABEL_STAGE_WAIT_INFLIGHT), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STATUS_HIT, S3StreamMetricsConstant.LABEL_STAGE_WAIT_INFLIGHT); public final YammerHistogramMetric readBlockCacheStageHitReadCacheTimeStats = S3StreamMetricsManager.buildReadBlockCacheStageTime( - new MetricName(StorageOperationStats.class, "ReadBlockStageTime" + S3StreamMetricsConstant.LABEL_STATUS_HIT + new MetricName(StorageOperationStats.class, READ_BLOCK_CACHE_METRIC_NAME + S3StreamMetricsConstant.LABEL_STATUS_HIT + S3StreamMetricsConstant.LABEL_STAGE_READ_CACHE), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STATUS_HIT, S3StreamMetricsConstant.LABEL_STAGE_READ_CACHE); public final YammerHistogramMetric readBlockCacheStageHitReadAheadTimeStats = S3StreamMetricsManager.buildReadBlockCacheStageTime( - new MetricName(StorageOperationStats.class, "ReadBlockStageTime" + S3StreamMetricsConstant.LABEL_STATUS_HIT + new MetricName(StorageOperationStats.class, READ_BLOCK_CACHE_METRIC_NAME + S3StreamMetricsConstant.LABEL_STATUS_HIT + S3StreamMetricsConstant.LABEL_STAGE_READ_AHEAD), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STATUS_HIT, S3StreamMetricsConstant.LABEL_STAGE_READ_AHEAD); public final YammerHistogramMetric readBlockCacheStageHitReadS3TimeStats = S3StreamMetricsManager.buildReadBlockCacheStageTime( - new MetricName(StorageOperationStats.class, "ReadBlockStageTime" + S3StreamMetricsConstant.LABEL_STATUS_HIT + new MetricName(StorageOperationStats.class, READ_BLOCK_CACHE_METRIC_NAME + S3StreamMetricsConstant.LABEL_STATUS_HIT + S3StreamMetricsConstant.LABEL_STAGE_READ_S3), MetricsLevel.DEBUG, S3StreamMetricsConstant.LABEL_STATUS_HIT, S3StreamMetricsConstant.LABEL_STAGE_READ_S3); + public final CounterMetric blockCacheReadS3Throughput = S3StreamMetricsManager.buildBlockCacheOpsThroughputMetric("read_s3"); + public final CounterMetric blockCacheBlockHitThroughput = S3StreamMetricsManager.buildBlockCacheOpsThroughputMetric("block_hit"); + public final CounterMetric blockCacheBlockMissThroughput = S3StreamMetricsManager.buildBlockCacheOpsThroughputMetric("block_miss"); + public final CounterMetric blockCacheBlockEvictThroughput = S3StreamMetricsManager.buildBlockCacheOpsThroughputMetric("block_evict"); + public final CounterMetric blockCacheReadStreamThroughput = S3StreamMetricsManager.buildBlockCacheOpsThroughputMetric("read_stream"); + private StorageOperationStats() { } diff --git a/s3stream/src/main/java/com/automq/stream/utils/LogSuppressor.java b/s3stream/src/main/java/com/automq/stream/utils/LogSuppressor.java index 2070e0253f..d1296e6629 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/LogSuppressor.java +++ b/s3stream/src/main/java/com/automq/stream/utils/LogSuppressor.java @@ -42,4 +42,20 @@ public void warn(String message, Object... args) { } } + public void info(String message, Object... args) { + long now = System.currentTimeMillis(); + for (; ; ) { + long last = lastLogTime.get(); + if (now - last > intervalMills) { + if (lastLogTime.compareAndSet(last, now)) { + logger.info("[SUPPRESSED_TIME=" + supressedCount.getAndSet(0) + "] " + message, args); + break; + } + } else { + supressedCount.incrementAndGet(); + break; + } + } + } + } diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/DataBlockCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/DataBlockCacheTest.java index 957a234dfa..0569c55827 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/DataBlockCacheTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/DataBlockCacheTest.java @@ -152,7 +152,7 @@ public void testGetBlock_unread() throws ExecutionException, InterruptedExceptio })); }).get(); eventLoops[0].submit(() -> { - assertEquals(2, cache.caches[0].blocks.size()); + assertEquals(1, cache.caches[0].blocks.size()); cache.getBlock(objectReader, new DataBlockIndex(STREAM_ID, 0, 200, 1, 0, 100)); }).get(); cf2.get().get().freeFuture().get(1, TimeUnit.SECONDS); @@ -163,6 +163,7 @@ public void testGetBlock_unread() throws ExecutionException, InterruptedExceptio assertEquals(2, cache.lru.size()); assertTrue(cache.blocks.containsKey(new DataBlockCache.DataBlockGroupKey(233, new DataBlockIndex(STREAM_ID, 0, 10, 1, 0, 100)))); assertTrue(cache.blocks.containsKey(new DataBlockCache.DataBlockGroupKey(233, new DataBlockIndex(STREAM_ID, 0, 200, 1, 0, 100)))); + assertEquals(1024 - 100 - 100, this.cache.sizeLimiter.permits()); }).get(); AtomicReference> cf3 = new AtomicReference<>(); diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReaderTest.java index 804f0e240e..8d41540d77 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReaderTest.java @@ -142,7 +142,6 @@ public void testRead_withReadahead() throws ExecutionException, InterruptedExcep assertEquals(8L, streamReader.readahead.nextReadaheadOffset); assertEquals(3L, streamReader.readahead.readaheadMarkOffset); assertEquals(1024L * 1024, streamReader.readahead.nextReadaheadSize); - assertEquals(3, dataBlockCache.caches[0].inactive.size()); eventLoops[0].submit(() -> readCf.set(streamReader.read(3L, 29L, 1))).get(); rst = readCf.get().get(); @@ -158,7 +157,6 @@ public void testRead_withReadahead() throws ExecutionException, InterruptedExcep assertEquals(12L, streamReader.readahead.nextReadaheadOffset); assertEquals(8L, streamReader.readahead.readaheadMarkOffset); assertEquals(1024L * 1024, streamReader.readahead.nextReadaheadSize); - assertEquals(4, dataBlockCache.caches[0].inactive.size()); eventLoops[0].submit(() -> readCf.set(streamReader.read(4L, 29L, 1))).get(); rst = readCf.get().get(); @@ -174,7 +172,6 @@ public void testRead_withReadahead() throws ExecutionException, InterruptedExcep assertEquals(12L, streamReader.readahead.nextReadaheadOffset); assertEquals(8L, streamReader.readahead.readaheadMarkOffset); assertEquals(1024L * 1024, streamReader.readahead.nextReadaheadSize); - assertEquals(5, dataBlockCache.caches[0].inactive.size()); eventLoops[0].submit(() -> readCf.set(streamReader.read(5L, 14L, Integer.MAX_VALUE))).get(); rst = readCf.get().get(); @@ -184,11 +181,12 @@ public void testRead_withReadahead() throws ExecutionException, InterruptedExcep // - load more index // - readahead from offset = 14...22 verify(dataBlockCache, timeout(1000).times(14 + 9 + 8)).getBlock(any(), any()); - assertEquals(34L, streamReader.loadedBlockIndexEndOffset); - assertEquals(14L + 8, streamReader.readahead.nextReadaheadOffset); - assertEquals(14L, streamReader.readahead.readaheadMarkOffset); - assertEquals(1024L * 1024 * 2, streamReader.readahead.nextReadaheadSize); - assertEquals(14, dataBlockCache.caches[0].inactive.size()); + eventLoops[0].submit(() -> { + assertEquals(34L, streamReader.loadedBlockIndexEndOffset); + assertEquals(14L + 8, streamReader.readahead.nextReadaheadOffset); + assertEquals(14L, streamReader.readahead.readaheadMarkOffset); + assertEquals(1024L * 1024 * 2, streamReader.readahead.nextReadaheadSize); + }).get(); when(objectManager.isObjectExist(anyLong())).thenReturn(false); @@ -211,7 +209,6 @@ public void testRead_withReadahead() throws ExecutionException, InterruptedExcep } }).when(objectManager).isObjectExist(anyLong()); eventLoops[0].submit(() -> readCf.set(streamReader.read(14L, 15L, Integer.MAX_VALUE))).get(); - rst = readCf.get().get(1, TimeUnit.SECONDS); }