Skip to content

Commit

Permalink
feat(issues1087): allow readahead when cache is free (#1126)
Browse files Browse the repository at this point in the history
* feat(issues1087): allow readahead when cache is free

Signed-off-by: Robin Han <[email protected]>

* fix: fix flaky test

Signed-off-by: Robin Han <[email protected]>

---------

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Apr 16, 2024
1 parent 2bcc5b9 commit 20e630a
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ByteBufAlloc {
public static final int STREAM_OBJECT_COMPACTION_WRITE = 8;
public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9;
public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10;
public static final int BLOCK_CACHE = 11;
public static ByteBufAllocMetric byteBufAllocMetric = null;

/**
Expand All @@ -68,7 +69,7 @@ public class ByteBufAlloc {
registerAllocType(STREAM_OBJECT_COMPACTION_WRITE, "stream_object_compaction_write");
registerAllocType(STREAM_SET_OBJECT_COMPACTION_READ, "stream_set_object_compaction_read");
registerAllocType(STREAM_SET_OBJECT_COMPACTION_WRITE, "stream_set_object_compaction_write");

registerAllocType(BLOCK_CACHE, "block_cache");
}

/**
Expand Down
10 changes: 8 additions & 2 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE;
import static com.automq.stream.s3.ByteBufAlloc.READ_INDEX_BLOCK;
import static com.automq.stream.s3.ObjectWriter.Footer.FOOTER_SIZE;
import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET;
Expand Down Expand Up @@ -71,7 +72,12 @@ public CompletableFuture<FindIndexResult> find(long streamId, long startOffset,

public CompletableFuture<DataBlockGroup> read(DataBlockIndex block) {
CompletableFuture<ByteBuf> rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition(), ThrottleStrategy.CATCH_UP);
return rangeReadCf.thenApply(DataBlockGroup::new);
return rangeReadCf.thenApply(buf -> {
ByteBuf pooled = ByteBufAlloc.byteBuffer(buf.readableBytes(), BLOCK_CACHE);
pooled.writeBytes(buf);
buf.release();
return new DataBlockGroup(pooled);
});
}

void asyncGetBasicObjectInfo() {
Expand Down Expand Up @@ -392,7 +398,7 @@ public static class DataBlockGroup implements AutoCloseable {
private final int recordCount;

public DataBlockGroup(ByteBuf buf) {
this.buf = buf.duplicate();
this.buf = buf;
this.recordCount = check(buf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.utils.CloseableIterator;
import io.netty.buffer.ByteBuf;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

@EventLoopSafe public class DataBlock {
@EventLoopSafe public class DataBlock extends AbstractReferenceCounted {
private static final int UNREAD_INIT = -1;
private final long objectId;
private final DataBlockIndex dataBlockIndex;
Expand Down Expand Up @@ -58,9 +60,7 @@ public CompletableFuture<DataBlock> dataFuture() {
}

public void free() {
if (dataBlockGroup != null) {
dataBlockGroup.release();
}
release();
freeCf.complete(this);
}

Expand Down Expand Up @@ -101,14 +101,19 @@ public void markRead() {
}
}

public void retain() {
dataBlockGroup.retain();
@Override
public ReferenceCounted touch(Object hint) {
return null;
}

public void release() {
dataBlockGroup.release();
@Override
protected void deallocate() {
if (dataBlockGroup != null) {
dataBlockGroup.release();
}
}

// Only for test
ByteBuf dataBuf() {
return dataBlockGroup.buf();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.cache.LRUCache;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.threads.EventLoop;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -45,6 +48,7 @@ public DataBlockCache(long maxSize, EventLoop[] eventLoops) {
for (int i = 0; i < eventLoops.length; i++) {
caches[i] = new Cache(eventLoops[i]);
}
S3StreamMetricsManager.registerBlockCacheSizeSupplier(() -> maxSize - sizeLimiter.permits());
}

/**
Expand All @@ -61,6 +65,10 @@ public CompletableFuture<DataBlock> getBlock(ObjectReader objectReader, DataBloc
return cache.getBlock(objectReader, dataBlockIndex);
}

public long available() {
return sizeLimiter.permits();
}

@Override
public String toString() {
return "DataBlockCache{" +
Expand Down Expand Up @@ -107,14 +115,17 @@ public int hashCode() {
class Cache implements ReadStatusChangeListener {
final Map<DataBlockGroupKey, DataBlock> blocks = new HashMap<>();
final LRUCache<DataBlockGroupKey, DataBlock> lru = new LRUCache<>();
final Map<DataBlockGroupKey, DataBlock> inactive = new HashMap<>();
private final EventLoop eventLoop;

public Cache(EventLoop eventLoop) {
this.eventLoop = eventLoop;
}

public CompletableFuture<DataBlock> getBlock(ObjectReader objectReader, DataBlockIndex dataBlockIndex) {
return FutureUtil.exec(() -> getBlock0(objectReader, dataBlockIndex), LOGGER, "getBlock");
}

private CompletableFuture<DataBlock> getBlock0(ObjectReader objectReader, DataBlockIndex dataBlockIndex) {
long objectId = objectReader.metadata().objectId();
DataBlockGroupKey key = new DataBlockGroupKey(objectId, dataBlockIndex);
DataBlock dataBlock = blocks.get(key);
Expand All @@ -128,12 +139,18 @@ public CompletableFuture<DataBlock> getBlock(ObjectReader objectReader, DataBloc
CompletableFuture<DataBlock> cf = new CompletableFuture<>();
// if the data is already loaded, the listener will be invoked right now,
// else the listener will be invoked immediately after data loaded in the same eventLoop.
if (dataBlock.dataFuture().isDone()) {
StorageOperationStats.getInstance().blockCacheBlockHitThroughput.add(MetricsLevel.INFO, dataBlock.dataBlockIndex().size());
} else {
StorageOperationStats.getInstance().blockCacheBlockMissThroughput.add(MetricsLevel.INFO, dataBlock.dataBlockIndex().size());
}
// DataBlock#retain should will before the complete the future to avoid the other read use #markRead to really free the data block.
dataBlock.retain();
dataBlock.dataFuture().whenComplete((db, ex) -> {
if (ex != null) {
cf.completeExceptionally(ex);
return;
}
db.retain();
cf.complete(db);
});
return cf;
Expand All @@ -143,11 +160,14 @@ private void read(ObjectReader reader, DataBlock dataBlock, EventLoop eventLoop)
reader.retain();
boolean acquired = sizeLimiter.acquire(dataBlock.dataBlockIndex().size(), () -> {
reader.read(dataBlock.dataBlockIndex()).whenCompleteAsync((rst, ex) -> {
StorageOperationStats.getInstance().blockCacheReadS3Throughput.add(MetricsLevel.INFO, dataBlock.dataBlockIndex().size());
reader.release();
DataBlockGroupKey key = new DataBlockGroupKey(dataBlock.objectId(), dataBlock.dataBlockIndex());
if (ex != null) {
dataBlock.completeExceptionally(ex);
blocks.remove(key, dataBlock);
} else {
lru.put(new DataBlockGroupKey(dataBlock.objectId(), dataBlock.dataBlockIndex()), dataBlock);
lru.put(key, dataBlock);
dataBlock.complete(rst);
}
if (sizeLimiter.requiredRelease()) {
Expand All @@ -173,40 +193,30 @@ void evict() {
private void evict0() {
// TODO: avoid awake more tasks than necessary
while (sizeLimiter.requiredRelease()) {
Map.Entry<DataBlockGroupKey, DataBlock> entry = null;
if (!inactive.isEmpty()) {
Iterator<Map.Entry<DataBlockGroupKey, DataBlock>> it = inactive.entrySet().iterator();
if (it.hasNext()) {
entry = it.next();
it.remove();
lru.remove(entry.getKey());
}
}
if (entry == null) {
entry = lru.pop();
}
Map.Entry<DataBlockGroupKey, DataBlock> entry;
entry = lru.pop();
if (entry == null) {
break;
}
DataBlockGroupKey key = entry.getKey();
DataBlock dataBlock = entry.getValue();
if (blocks.remove(key, dataBlock)) {
dataBlock.free();
StorageOperationStats.getInstance().blockCacheBlockEvictThroughput.add(MetricsLevel.INFO, dataBlock.dataBlockIndex().size());
} else {
LOGGER.error("[BUG] duplicated free data block {}", dataBlock);
}
}
}

public void markUnread(DataBlock dataBlock) {
DataBlockGroupKey key = new DataBlockGroupKey(dataBlock.objectId(), dataBlock.dataBlockIndex());
inactive.remove(key, dataBlock);
}

public void markRead(DataBlock dataBlock) {
DataBlockGroupKey key = new DataBlockGroupKey(dataBlock.objectId(), dataBlock.dataBlockIndex());
if (dataBlock == blocks.get(key)) {
inactive.put(key, dataBlock);
if (blocks.remove(key, dataBlock)) {
lru.remove(key);
dataBlock.free();
}
}

Expand Down
Loading

0 comments on commit 20e630a

Please sign in to comment.