From f2e4236ab7ad32ed4bbcbe108b0ebd58adf495ec Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Fri, 12 Apr 2024 19:36:11 +0800 Subject: [PATCH] feat(issues1087): add stream readers (#1116) Signed-off-by: Robin Han --- .../kafka/log/stream/s3/DefaultS3Client.java | 4 +- .../s3/cache/blockcache/AsyncSemaphore.java | 4 +- .../stream/s3/cache/blockcache/DataBlock.java | 2 + .../s3/cache/blockcache/DataBlockCache.java | 17 +- .../s3/cache/blockcache/StreamReader.java | 32 +++- .../s3/cache/blockcache/StreamReaders.java | 171 ++++++++++++++++++ .../automq/stream/utils/LogSuppressor.java | 45 +++++ .../java/com/automq/stream/utils/Systems.java | 16 ++ 8 files changed, 272 insertions(+), 19 deletions(-) create mode 100644 s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java create mode 100644 s3stream/src/main/java/com/automq/stream/utils/LogSuppressor.java create mode 100644 s3stream/src/main/java/com/automq/stream/utils/Systems.java diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index f7dcd7de0e..da3936aca6 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -19,8 +19,8 @@ import com.automq.stream.s3.Config; import com.automq.stream.s3.S3Storage; import com.automq.stream.s3.S3StreamClient; -import com.automq.stream.s3.cache.DefaultS3BlockCache; import com.automq.stream.s3.cache.S3BlockCache; +import com.automq.stream.s3.cache.blockcache.StreamReaders; import com.automq.stream.s3.compact.CompactionManager; import com.automq.stream.s3.failover.Failover; import com.automq.stream.s3.failover.FailoverRequest; @@ -94,7 +94,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) { this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext); this.streamManager = new ControllerStreamManager(this.metadataManager, this.requestSender, kafkaConfig); this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, kafkaConfig); - this.blockCache = new DefaultS3BlockCache(this.config, objectManager, s3Operator); + this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, s3Operator); this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionS3Operator); this.writeAheadLog = BlockWALService.builder(this.config.walPath(), this.config.walCapacity()).config(this.config).build(); this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, s3Operator); diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/AsyncSemaphore.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/AsyncSemaphore.java index fd192bd237..e6d2e11c80 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/AsyncSemaphore.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/AsyncSemaphore.java @@ -54,8 +54,8 @@ public synchronized boolean acquire(long requiredPermits, Supplier 0; + public synchronized boolean requiredRelease() { + return permits <= 0 || !tasks.isEmpty(); } public synchronized long permits() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java index 27654bae3b..03df398c4e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java @@ -121,6 +121,8 @@ public List getRecords(long startOffset, long endOffset, int break; } continue; + } else { + recordBatch.release(); } if (recordBatch.getBaseOffset() >= endOffset) { break; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java index 6e0a970daf..3b2404ddc9 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java @@ -14,7 +14,7 @@ import com.automq.stream.s3.DataBlockIndex; import com.automq.stream.s3.ObjectReader; import com.automq.stream.s3.cache.LRUCache; -import io.netty.channel.DefaultEventLoop; +import io.netty.channel.EventLoop; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -35,7 +35,7 @@ public class DataBlockCache { final AsyncSemaphore sizeLimiter; private final long maxSize; - public DataBlockCache(long maxSize, DefaultEventLoop[] eventLoops) { + public DataBlockCache(long maxSize, EventLoop[] eventLoops) { this.maxSize = maxSize; this.sizeLimiter = new AsyncSemaphore(maxSize); this.caches = new Cache[eventLoops.length]; @@ -105,16 +105,13 @@ class Cache implements ReadStatusChangeListener { final Map blocks = new HashMap<>(); final LRUCache lru = new LRUCache<>(); final Map inactive = new HashMap<>(); - private final DefaultEventLoop eventLoop; + private final EventLoop eventLoop; - public Cache(DefaultEventLoop eventLoop) { + public Cache(EventLoop eventLoop) { this.eventLoop = eventLoop; } public CompletableFuture getBlock(ObjectReader objectReader, DataBlockIndex dataBlockIndex) { - if (Thread.currentThread().getId() != eventLoop.threadProperties().id()) { - throw new IllegalStateException("getBlock must be invoked in the same eventLoop"); - } long objectId = objectReader.metadata().objectId(); DataBlockGroupKey key = new DataBlockGroupKey(objectId, dataBlockIndex); DataBlock dataBlock = blocks.get(key); @@ -139,7 +136,7 @@ public CompletableFuture getBlock(ObjectReader objectReader, DataBloc return cf; } - private void read(ObjectReader reader, DataBlock dataBlock, DefaultEventLoop eventLoop) { + private void read(ObjectReader reader, DataBlock dataBlock, EventLoop eventLoop) { reader.retain(); boolean acquired = sizeLimiter.acquire(dataBlock.dataBlockIndex().size(), () -> { reader.read(dataBlock.dataBlockIndex()).whenCompleteAsync((rst, ex) -> { @@ -150,7 +147,7 @@ private void read(ObjectReader reader, DataBlock dataBlock, DefaultEventLoop eve dataBlock.complete(rst); } lru.put(new DataBlockGroupKey(dataBlock.objectId(), dataBlock.dataBlockIndex()), dataBlock); - if (!sizeLimiter.hasPermits()) { + if (sizeLimiter.requiredRelease()) { // In the described scenario, with maxSize set to 1, upon the sequential arrival of requests #getBlock(size=2) and #getBlock(3), // #getBlock(3) will wait in the queue until permits are available. // If, after #getBlock(size=2) completes, permits are still lacking in the sizeLimiter, implying queued tasks, @@ -175,7 +172,7 @@ void evict() { } private void evict0() { - while (!sizeLimiter.hasPermits()) { + while (sizeLimiter.requiredRelease()) { Map.Entry entry = null; if (!inactive.isEmpty()) { Iterator> it = inactive.entrySet().iterator(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java index 9df711195c..1a8a28b6fc 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java @@ -18,6 +18,7 @@ import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.model.StreamRecordBatch; import com.automq.stream.s3.objects.ObjectManager; +import com.automq.stream.utils.LogSuppressor; import io.netty.channel.EventLoop; import java.util.ArrayList; import java.util.HashMap; @@ -40,6 +41,7 @@ public class StreamReader { private static final Logger LOGGER = LoggerFactory.getLogger(StreamReader.class); private static final int DEFAULT_READAHEAD_SIZE = 1024 * 1024 / 2; private static final int MAX_READAHEAD_SIZE = 32 * 1024 * 1024; + private static final LogSuppressor LOG_SUPPRESSOR = new LogSuppressor(LOGGER, 30000); // visible to test final NavigableMap blocksMap = new TreeMap<>(); final Readahead readahead; @@ -51,6 +53,7 @@ public class StreamReader { long nextReadOffset; long loadedBlockIndexEndOffset = 0L; private CompletableFuture> inflightLoadIndexCf; + private long lastAccessTimestamp = System.currentTimeMillis(); public StreamReader( long streamId, long nextReadOffset, EventLoop eventLoop, @@ -69,14 +72,32 @@ public StreamReader( } public CompletableFuture read(long startOffset, long endOffset, int maxBytes) { - // TODO: only allow one inflight read + lastAccessTimestamp = System.currentTimeMillis(); ReadContext readContext = new ReadContext(); read0(readContext, startOffset, endOffset, maxBytes); - readContext.cf.thenAccept(this::afterRead).exceptionally(ex -> { - readContext.records.forEach(StreamRecordBatch::release); - return null; + return readContext.cf.whenComplete((rst, ex) -> { + if (ex != null) { + readContext.records.forEach(StreamRecordBatch::release); + } else { + afterRead(rst); + } + }); + } + + public long nextReadOffset() { + return nextReadOffset; + } + + public long lastAccessTimestamp() { + return lastAccessTimestamp; + } + + public void close() { + blocksMap.forEach((k, v) -> { + if (v.data != null) { + v.data.markRead(); + } }); - return readContext.cf; } void read0(ReadContext ctx, long startOffset, long endOffset, int maxBytes) { @@ -280,6 +301,7 @@ private void handleBlockFree(Block block) { if (block == blockInMap) { // The unread block is evicted; It means the cache is full, we need to reset the readahead. readahead.reset(); + LOG_SUPPRESSOR.warn("The unread block is evicted, please increase the block cache size"); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java new file mode 100644 index 0000000000..a8062fd19b --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java @@ -0,0 +1,171 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.cache.blockcache; + +import com.automq.stream.s3.ObjectReader; +import com.automq.stream.s3.cache.ObjectReaderLRUCache; +import com.automq.stream.s3.cache.ReadDataBlock; +import com.automq.stream.s3.cache.S3BlockCache; +import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.objects.ObjectManager; +import com.automq.stream.s3.operator.S3Operator; +import com.automq.stream.s3.trace.context.TraceContext; +import com.automq.stream.utils.FutureUtil; +import com.automq.stream.utils.Systems; +import com.automq.stream.utils.ThreadUtils; +import io.netty.channel.DefaultEventLoop; +import io.netty.channel.EventLoop; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StreamReaders implements S3BlockCache { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamReaders.class); + private static final int MAX_OBJECT_READER_SIZE = 100 * 1024 * 1024; // 100MB; + private static final long STREAM_READER_EXPIRED_MILLS = TimeUnit.MINUTES.toMillis(3); + private static final long STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS = TimeUnit.MINUTES.toMillis(1); + private final Cache[] caches; + private final DataBlockCache dataBlockCache; + private final ObjectReaderLRUCache objectReaders; + private final ObjectReaderFactory objectReaderFactory; + + private final ObjectManager objectManager; + private final S3Operator s3Operator; + + public StreamReaders(long size, ObjectManager objectManager, S3Operator s3Operator) { + this(size, objectManager, s3Operator, Systems.CPU_CORES); + } + + public StreamReaders(long size, ObjectManager objectManager, S3Operator s3Operator, int concurrency) { + EventLoop[] eventLoops = new EventLoop[concurrency]; + for (int i = 0; i < concurrency; i++) { + eventLoops[i] = new DefaultEventLoop(ThreadUtils.createThreadFactory("stream-reader-" + i, true)); + } + this.caches = new Cache[concurrency]; + for (int i = 0; i < concurrency; i++) { + caches[i] = new Cache(eventLoops[i]); + } + this.dataBlockCache = new DataBlockCache(size, eventLoops); + this.objectReaders = new ObjectReaderLRUCache(MAX_OBJECT_READER_SIZE); + this.objectReaderFactory = new ObjectReaderFactory(); + + this.objectManager = objectManager; + this.s3Operator = s3Operator; + } + + @Override + public CompletableFuture read(TraceContext context, long streamId, long startOffset, long endOffset, + int maxBytes) { + Cache cache = caches[Math.abs((int) (streamId % caches.length))]; + return cache.read(streamId, startOffset, endOffset, maxBytes); + } + + static class StreamReaderKey { + final long streamId; + final long startOffset; + + public StreamReaderKey(long streamId, long startOffset) { + this.streamId = streamId; + this.startOffset = startOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + StreamReaderKey key = (StreamReaderKey) o; + return streamId == key.streamId && startOffset == key.startOffset; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset); + } + + @Override + public String toString() { + return "StreamReaderKey{" + + "streamId=" + streamId + + ", startOffset=" + startOffset + + '}'; + } + } + + class Cache { + private final EventLoop eventLoop; + private final Map streamReaders = new HashMap<>(); + private long lastStreamReaderExpiredCheckTime = System.currentTimeMillis(); + + public Cache(EventLoop eventLoop) { + this.eventLoop = eventLoop; + } + + public CompletableFuture read(long streamId, long startOffset, + long endOffset, + int maxBytes) { + CompletableFuture cf = new CompletableFuture<>(); + eventLoop.execute(() -> { + cleanupExpiredStreamReader(); + StreamReaderKey key = new StreamReaderKey(streamId, startOffset); + StreamReader streamReader = streamReaders.computeIfAbsent(key, k -> new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache)); + CompletableFuture streamReadCf = streamReader.read(startOffset, endOffset, maxBytes) + .whenComplete((rst, ex) -> { + if (ex != null) { + LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail", streamId, startOffset, endOffset, maxBytes, ex); + } + // when two stream read progress is the same, only one stream reader can be retained + if (streamReaders.remove(key, streamReader) && ex == null) { + streamReaders.put(new StreamReaderKey(streamId, streamReader.nextReadOffset()), streamReader); + } + }); + FutureUtil.propagate(streamReadCf, cf); + }); + return cf; + } + + private void cleanupExpiredStreamReader() { + long now = System.currentTimeMillis(); + if (now > lastStreamReaderExpiredCheckTime + STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS) { + lastStreamReaderExpiredCheckTime = now; + Iterator> it = streamReaders.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + StreamReader streamReader = entry.getValue(); + if (now > streamReader.lastAccessTimestamp() + STREAM_READER_EXPIRED_MILLS) { + streamReader.close(); + it.remove(); + } + } + } + } + } + + class ObjectReaderFactory implements Function { + @Override + public synchronized ObjectReader apply(S3ObjectMetadata metadata) { + ObjectReader objectReader = objectReaders.get(metadata.objectId()); + if (objectReader == null) { + objectReader = new ObjectReader(metadata, s3Operator); + objectReaders.put(metadata.objectId(), objectReader); + } + return objectReader.retain(); + } + } +} diff --git a/s3stream/src/main/java/com/automq/stream/utils/LogSuppressor.java b/s3stream/src/main/java/com/automq/stream/utils/LogSuppressor.java new file mode 100644 index 0000000000..2070e0253f --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/utils/LogSuppressor.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.utils; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; + +public class LogSuppressor { + private final Logger logger; + private final long intervalMills; + private final AtomicLong lastLogTime = new AtomicLong(System.currentTimeMillis()); + private AtomicInteger supressedCount = new AtomicInteger(); + + public LogSuppressor(Logger logger, long intervalMills) { + this.logger = logger; + this.intervalMills = intervalMills; + } + + public void warn(String message, Object... args) { + long now = System.currentTimeMillis(); + for (; ; ) { + long last = lastLogTime.get(); + if (now - last > intervalMills) { + if (lastLogTime.compareAndSet(last, now)) { + logger.warn("[SUPPRESSED_TIME=" + supressedCount.getAndSet(0) + "] " + message, args); + break; + } + } else { + supressedCount.incrementAndGet(); + break; + } + } + } + +} diff --git a/s3stream/src/main/java/com/automq/stream/utils/Systems.java b/s3stream/src/main/java/com/automq/stream/utils/Systems.java new file mode 100644 index 0000000000..ffa958a104 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/utils/Systems.java @@ -0,0 +1,16 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.utils; + +public class Systems { + public static final int CPU_CORES = Runtime.getRuntime().availableProcessors(); +}