Skip to content

Commit

Permalink
feat(issues1087): add data block cache (#1107)
Browse files Browse the repository at this point in the history
* feat(issues1087): add data block cache

Signed-off-by: Robin Han <[email protected]>

* fix(test): unit test

Signed-off-by: Robin Han <[email protected]>

---------

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Apr 10, 2024
1 parent 697df12 commit 14aac02
Show file tree
Hide file tree
Showing 12 changed files with 673 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public Optional<ByteBuf> get(String filePath, long position, int length) {
if (entry.getKey() + entry.getValue().dataLength < position + length) {
return Optional.empty();
}
lru.touch(new Key(filePath, cacheStartPosition));
lru.touchIfExist(new Key(filePath, cacheStartPosition));
MappedByteBuffer cacheByteBuffer = this.cacheByteBuffer.duplicate();
long nextPosition = position;
int remaining = length;
Expand Down
31 changes: 31 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public ObjectReader(S3ObjectMetadata metadata, S3Operator s3Operator) {
asyncGetBasicObjectInfo();
}

public S3ObjectMetadata metadata() {
return metadata;
}

public String objectKey() {
return objectKey;
}
Expand Down Expand Up @@ -116,6 +120,21 @@ public void close0() {
basicObjectInfoCf.thenAccept(BasicObjectInfo::close);
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ObjectReader reader = (ObjectReader) o;
return Objects.equals(metadata.objectId(), reader.metadata.objectId());
}

@Override
public int hashCode() {
return Objects.hash(metadata.objectId());
}

/**
*
*/
Expand Down Expand Up @@ -428,10 +447,22 @@ public int recordCount() {
return recordCount;
}

public ByteBuf buf() {
return buf;
}

@Override
public void close() {
buf.release();
}

public void retain() {
buf.retain();
}

public void release() {
buf.release();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ public GetCacheResult get0(long streamId, long startOffset, long endOffset, int
active.remove(cacheBlockKey);
inactive.put(cacheBlockKey, cacheBlock.size);
} else {
if (!active.touch(cacheBlockKey)) {
inactive.touch(cacheBlockKey);
if (!active.touchIfExist(cacheBlockKey)) {
inactive.touchIfExist(cacheBlockKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ public LRUCache() {
cacheEntrySet = cache.entrySet();
}

public synchronized boolean touch(K key) {
public synchronized boolean touchIfExist(K key) {
return cache.get(key) != null;
}

public synchronized void put(K key, V value) {
if (cache.put(key, value) != null) {
touch(key);
touchIfExist(key);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void updateReadProgress(long streamId, long startOffset) {
return;
}
agent.updateReadProgress(startOffset);
readAheadAgentLRUCache.touch(agent);
readAheadAgentLRUCache.touchIfExist(agent);
}
}
}
Expand All @@ -73,7 +73,7 @@ public ReadAheadAgent getReadAheadAgent(long streamId, long startOffset) {
synchronized (agentMap) {
ReadAheadAgent agent = agentMap.get(startOffset);
if (agent != null) {
readAheadAgentLRUCache.touch(agent);
readAheadAgentLRUCache.touchIfExist(agent);
}
return agent;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.cache.blockcache;

import io.netty.channel.EventLoop;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AsyncSemaphore {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSemaphore.class);
private final Queue<AsyncSemaphoreTask> tasks = new LinkedList<>();
private long permits;

public AsyncSemaphore(long permits) {
this.permits = permits;
}

/**
* Acquire permits, if permits are not enough, the task will be added to the queue
*
* @param requiredPermits the required permits
* @param task task to run when the permits are available, the task should return a CompletableFuture
* which will be completed when the permits could be released.
* @param eventLoop the eventLoop to run the task when the permits are available
* @return true if the permits are acquired, false if the task is added to the waiting queue.
*/
public synchronized boolean acquire(long requiredPermits, Supplier<CompletableFuture<?>> task,
EventLoop eventLoop) {
if (permits >= 0) {
// allow permits minus to negative
permits -= requiredPermits;
try {
task.get().whenComplete((nil, ex) -> release(requiredPermits));
} catch (Throwable e) {
LOGGER.error("Error in task", e);
}
return true;
} else {
tasks.add(new AsyncSemaphoreTask(requiredPermits, task, eventLoop));
return false;
}
}

public synchronized boolean hasPermits() {
return permits > 0;
}

public synchronized long permits() {
return permits;
}

synchronized void release(long requiredPermits) {
permits += requiredPermits;
if (permits > 0) {
AsyncSemaphoreTask t = tasks.poll();
if (t != null) {
// use eventLoop to reset the thread stack to avoid stack overflow
t.eventLoop.execute(() -> acquire(t.requiredPermits, t.task, t.eventLoop));
}
}
}

static class AsyncSemaphoreTask {
final long requiredPermits;
final Supplier<CompletableFuture<?>> task;
final EventLoop eventLoop;

public AsyncSemaphoreTask(long requiredPermits, Supplier<CompletableFuture<?>> task, EventLoop eventLoop) {
this.requiredPermits = requiredPermits;
this.task = task;
this.eventLoop = eventLoop;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.cache.blockcache;

import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.ObjectReader;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

@EventLoopSafe
public class DataBlock {
private static final int UNREAD_INIT = -1;
private final long objectId;
private final DataBlockIndex dataBlockIndex;
private final CompletableFuture<DataBlock> loadCf = new CompletableFuture<>();
private final CompletableFuture<DataBlock> freeCf = new CompletableFuture<>();
private final AtomicInteger unreadCnt = new AtomicInteger(UNREAD_INIT);
private final ReadStatusChangeListener listener;
private ObjectReader.DataBlockGroup dataBlockGroup;

public DataBlock(long objectId, DataBlockIndex dataBlockIndex, ReadStatusChangeListener observeListener) {
this.objectId = objectId;
this.dataBlockIndex = dataBlockIndex;
this.listener = observeListener;
}

/**
* Complete the data loading
*/
public void complete(ObjectReader.DataBlockGroup dataBlockGroup) {
this.dataBlockGroup = dataBlockGroup;
loadCf.complete(this);
}

/**
* Complete the data loading with exception
*/
public void completeExceptionally(Throwable ex) {
loadCf.completeExceptionally(ex);
freeCf.complete(null);
}

public CompletableFuture<DataBlock> dataFuture() {
return loadCf;
}

public void free() {
if (dataBlockGroup != null) {
dataBlockGroup.release();
}
freeCf.complete(this);
}

public CompletableFuture<DataBlock> freeFuture() {
return freeCf;
}

public long objectId() {
return objectId;
}

public DataBlockIndex dataBlockIndex() {
return dataBlockIndex;
}

public void markUnread() {
if (unreadCnt.get() == UNREAD_INIT) {
unreadCnt.set(1);
} else {
int old = unreadCnt.getAndIncrement();
if (old == 0) {
// observe
listener.markUnread(this);
}
}
}

public void markRead() {
int unreadCnt = this.unreadCnt.decrementAndGet();
if (unreadCnt <= 0) {
listener.markRead(this);
}
}

public void retain() {
dataBlockGroup.retain();
}

public void release() {
dataBlockGroup.release();
}

ByteBuf dataBuf() {
return dataBlockGroup.buf();
}
}
Loading

0 comments on commit 14aac02

Please sign in to comment.