Skip to content

Commit

Permalink
feat(issues1087): add stream readers (#1116)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Apr 12, 2024
1 parent 2d0123b commit f2e4236
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 19 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import com.automq.stream.s3.Config;
import com.automq.stream.s3.S3Storage;
import com.automq.stream.s3.S3StreamClient;
import com.automq.stream.s3.cache.DefaultS3BlockCache;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.cache.blockcache.StreamReaders;
import com.automq.stream.s3.compact.CompactionManager;
import com.automq.stream.s3.failover.Failover;
import com.automq.stream.s3.failover.FailoverRequest;
Expand Down Expand Up @@ -94,7 +94,7 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) {
this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext);
this.streamManager = new ControllerStreamManager(this.metadataManager, this.requestSender, kafkaConfig);
this.objectManager = new ControllerObjectManager(this.requestSender, this.metadataManager, kafkaConfig);
this.blockCache = new DefaultS3BlockCache(this.config, objectManager, s3Operator);
this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, s3Operator);
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionS3Operator);
this.writeAheadLog = BlockWALService.builder(this.config.walPath(), this.config.walCapacity()).config(this.config).build();
this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, s3Operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public synchronized boolean acquire(long requiredPermits, Supplier<CompletableFu
}
}

public synchronized boolean hasPermits() {
return permits > 0;
public synchronized boolean requiredRelease() {
return permits <= 0 || !tasks.isEmpty();
}

public synchronized long permits() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public List<StreamRecordBatch> getRecords(long startOffset, long endOffset, int
break;
}
continue;
} else {
recordBatch.release();
}
if (recordBatch.getBaseOffset() >= endOffset) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.cache.LRUCache;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -35,7 +35,7 @@ public class DataBlockCache {
final AsyncSemaphore sizeLimiter;
private final long maxSize;

public DataBlockCache(long maxSize, DefaultEventLoop[] eventLoops) {
public DataBlockCache(long maxSize, EventLoop[] eventLoops) {
this.maxSize = maxSize;
this.sizeLimiter = new AsyncSemaphore(maxSize);
this.caches = new Cache[eventLoops.length];
Expand Down Expand Up @@ -105,16 +105,13 @@ class Cache implements ReadStatusChangeListener {
final Map<DataBlockGroupKey, DataBlock> blocks = new HashMap<>();
final LRUCache<DataBlockGroupKey, DataBlock> lru = new LRUCache<>();
final Map<DataBlockGroupKey, DataBlock> inactive = new HashMap<>();
private final DefaultEventLoop eventLoop;
private final EventLoop eventLoop;

public Cache(DefaultEventLoop eventLoop) {
public Cache(EventLoop eventLoop) {
this.eventLoop = eventLoop;
}

public CompletableFuture<DataBlock> getBlock(ObjectReader objectReader, DataBlockIndex dataBlockIndex) {
if (Thread.currentThread().getId() != eventLoop.threadProperties().id()) {
throw new IllegalStateException("getBlock must be invoked in the same eventLoop");
}
long objectId = objectReader.metadata().objectId();
DataBlockGroupKey key = new DataBlockGroupKey(objectId, dataBlockIndex);
DataBlock dataBlock = blocks.get(key);
Expand All @@ -139,7 +136,7 @@ public CompletableFuture<DataBlock> getBlock(ObjectReader objectReader, DataBloc
return cf;
}

private void read(ObjectReader reader, DataBlock dataBlock, DefaultEventLoop eventLoop) {
private void read(ObjectReader reader, DataBlock dataBlock, EventLoop eventLoop) {
reader.retain();
boolean acquired = sizeLimiter.acquire(dataBlock.dataBlockIndex().size(), () -> {
reader.read(dataBlock.dataBlockIndex()).whenCompleteAsync((rst, ex) -> {
Expand All @@ -150,7 +147,7 @@ private void read(ObjectReader reader, DataBlock dataBlock, DefaultEventLoop eve
dataBlock.complete(rst);
}
lru.put(new DataBlockGroupKey(dataBlock.objectId(), dataBlock.dataBlockIndex()), dataBlock);
if (!sizeLimiter.hasPermits()) {
if (sizeLimiter.requiredRelease()) {
// In the described scenario, with maxSize set to 1, upon the sequential arrival of requests #getBlock(size=2) and #getBlock(3),
// #getBlock(3) will wait in the queue until permits are available.
// If, after #getBlock(size=2) completes, permits are still lacking in the sizeLimiter, implying queued tasks,
Expand All @@ -175,7 +172,7 @@ void evict() {
}

private void evict0() {
while (!sizeLimiter.hasPermits()) {
while (sizeLimiter.requiredRelease()) {
Map.Entry<DataBlockGroupKey, DataBlock> entry = null;
if (!inactive.isEmpty()) {
Iterator<Map.Entry<DataBlockGroupKey, DataBlock>> it = inactive.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.utils.LogSuppressor;
import io.netty.channel.EventLoop;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -40,6 +41,7 @@ public class StreamReader {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamReader.class);
private static final int DEFAULT_READAHEAD_SIZE = 1024 * 1024 / 2;
private static final int MAX_READAHEAD_SIZE = 32 * 1024 * 1024;
private static final LogSuppressor LOG_SUPPRESSOR = new LogSuppressor(LOGGER, 30000);
// visible to test
final NavigableMap<Long, Block> blocksMap = new TreeMap<>();
final Readahead readahead;
Expand All @@ -51,6 +53,7 @@ public class StreamReader {
long nextReadOffset;
long loadedBlockIndexEndOffset = 0L;
private CompletableFuture<Map<Long, Block>> inflightLoadIndexCf;
private long lastAccessTimestamp = System.currentTimeMillis();

public StreamReader(
long streamId, long nextReadOffset, EventLoop eventLoop,
Expand All @@ -69,14 +72,32 @@ public StreamReader(
}

public CompletableFuture<ReadDataBlock> read(long startOffset, long endOffset, int maxBytes) {
// TODO: only allow one inflight read
lastAccessTimestamp = System.currentTimeMillis();
ReadContext readContext = new ReadContext();
read0(readContext, startOffset, endOffset, maxBytes);
readContext.cf.thenAccept(this::afterRead).exceptionally(ex -> {
readContext.records.forEach(StreamRecordBatch::release);
return null;
return readContext.cf.whenComplete((rst, ex) -> {
if (ex != null) {
readContext.records.forEach(StreamRecordBatch::release);
} else {
afterRead(rst);
}
});
}

public long nextReadOffset() {
return nextReadOffset;
}

public long lastAccessTimestamp() {
return lastAccessTimestamp;
}

public void close() {
blocksMap.forEach((k, v) -> {
if (v.data != null) {
v.data.markRead();
}
});
return readContext.cf;
}

void read0(ReadContext ctx, long startOffset, long endOffset, int maxBytes) {
Expand Down Expand Up @@ -280,6 +301,7 @@ private void handleBlockFree(Block block) {
if (block == blockInMap) {
// The unread block is evicted; It means the cache is full, we need to reset the readahead.
readahead.reset();
LOG_SUPPRESSOR.warn("The unread block is evicted, please increase the block cache size");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.cache.blockcache;

import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.cache.ObjectReaderLRUCache;
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.trace.context.TraceContext;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.Systems;
import com.automq.stream.utils.ThreadUtils;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamReaders implements S3BlockCache {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamReaders.class);
private static final int MAX_OBJECT_READER_SIZE = 100 * 1024 * 1024; // 100MB;
private static final long STREAM_READER_EXPIRED_MILLS = TimeUnit.MINUTES.toMillis(3);
private static final long STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS = TimeUnit.MINUTES.toMillis(1);
private final Cache[] caches;
private final DataBlockCache dataBlockCache;
private final ObjectReaderLRUCache objectReaders;
private final ObjectReaderFactory objectReaderFactory;

private final ObjectManager objectManager;
private final S3Operator s3Operator;

public StreamReaders(long size, ObjectManager objectManager, S3Operator s3Operator) {
this(size, objectManager, s3Operator, Systems.CPU_CORES);
}

public StreamReaders(long size, ObjectManager objectManager, S3Operator s3Operator, int concurrency) {
EventLoop[] eventLoops = new EventLoop[concurrency];
for (int i = 0; i < concurrency; i++) {
eventLoops[i] = new DefaultEventLoop(ThreadUtils.createThreadFactory("stream-reader-" + i, true));
}
this.caches = new Cache[concurrency];
for (int i = 0; i < concurrency; i++) {
caches[i] = new Cache(eventLoops[i]);
}
this.dataBlockCache = new DataBlockCache(size, eventLoops);
this.objectReaders = new ObjectReaderLRUCache(MAX_OBJECT_READER_SIZE);
this.objectReaderFactory = new ObjectReaderFactory();

this.objectManager = objectManager;
this.s3Operator = s3Operator;
}

@Override
public CompletableFuture<ReadDataBlock> read(TraceContext context, long streamId, long startOffset, long endOffset,
int maxBytes) {
Cache cache = caches[Math.abs((int) (streamId % caches.length))];
return cache.read(streamId, startOffset, endOffset, maxBytes);
}

static class StreamReaderKey {
final long streamId;
final long startOffset;

public StreamReaderKey(long streamId, long startOffset) {
this.streamId = streamId;
this.startOffset = startOffset;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
StreamReaderKey key = (StreamReaderKey) o;
return streamId == key.streamId && startOffset == key.startOffset;
}

@Override
public int hashCode() {
return Objects.hash(streamId, startOffset);
}

@Override
public String toString() {
return "StreamReaderKey{" +
"streamId=" + streamId +
", startOffset=" + startOffset +
'}';
}
}

class Cache {
private final EventLoop eventLoop;
private final Map<StreamReaderKey, StreamReader> streamReaders = new HashMap<>();
private long lastStreamReaderExpiredCheckTime = System.currentTimeMillis();

public Cache(EventLoop eventLoop) {
this.eventLoop = eventLoop;
}

public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
long endOffset,
int maxBytes) {
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
eventLoop.execute(() -> {
cleanupExpiredStreamReader();
StreamReaderKey key = new StreamReaderKey(streamId, startOffset);
StreamReader streamReader = streamReaders.computeIfAbsent(key, k -> new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache));
CompletableFuture<ReadDataBlock> streamReadCf = streamReader.read(startOffset, endOffset, maxBytes)
.whenComplete((rst, ex) -> {
if (ex != null) {
LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail", streamId, startOffset, endOffset, maxBytes, ex);
}
// when two stream read progress is the same, only one stream reader can be retained
if (streamReaders.remove(key, streamReader) && ex == null) {
streamReaders.put(new StreamReaderKey(streamId, streamReader.nextReadOffset()), streamReader);
}
});
FutureUtil.propagate(streamReadCf, cf);
});
return cf;
}

private void cleanupExpiredStreamReader() {
long now = System.currentTimeMillis();
if (now > lastStreamReaderExpiredCheckTime + STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS) {
lastStreamReaderExpiredCheckTime = now;
Iterator<Map.Entry<StreamReaderKey, StreamReader>> it = streamReaders.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<StreamReaderKey, StreamReader> entry = it.next();
StreamReader streamReader = entry.getValue();
if (now > streamReader.lastAccessTimestamp() + STREAM_READER_EXPIRED_MILLS) {
streamReader.close();
it.remove();
}
}
}
}
}

class ObjectReaderFactory implements Function<S3ObjectMetadata, ObjectReader> {
@Override
public synchronized ObjectReader apply(S3ObjectMetadata metadata) {
ObjectReader objectReader = objectReaders.get(metadata.objectId());
if (objectReader == null) {
objectReader = new ObjectReader(metadata, s3Operator);
objectReaders.put(metadata.objectId(), objectReader);
}
return objectReader.retain();
}
}
}
45 changes: 45 additions & 0 deletions s3stream/src/main/java/com/automq/stream/utils/LogSuppressor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.utils;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;

public class LogSuppressor {
private final Logger logger;
private final long intervalMills;
private final AtomicLong lastLogTime = new AtomicLong(System.currentTimeMillis());
private AtomicInteger supressedCount = new AtomicInteger();

public LogSuppressor(Logger logger, long intervalMills) {
this.logger = logger;
this.intervalMills = intervalMills;
}

public void warn(String message, Object... args) {
long now = System.currentTimeMillis();
for (; ; ) {
long last = lastLogTime.get();
if (now - last > intervalMills) {
if (lastLogTime.compareAndSet(last, now)) {
logger.warn("[SUPPRESSED_TIME=" + supressedCount.getAndSet(0) + "] " + message, args);
break;
}
} else {
supressedCount.incrementAndGet();
break;
}
}
}

}
Loading

0 comments on commit f2e4236

Please sign in to comment.