Skip to content

Commit

Permalink
fix(issues1798): fix stream reader memory leak
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Aug 14, 2024
1 parent 4f4a93c commit 5095dbe
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,28 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EventLoopSafe public class DataBlock extends AbstractReferenceCounted {
private static final Logger LOGGER = LoggerFactory.getLogger(DataBlock.class);
private static final int UNREAD_INIT = -1;
private final long objectId;
private final DataBlockIndex dataBlockIndex;
private final CompletableFuture<DataBlock> loadCf = new CompletableFuture<>();
private final CompletableFuture<DataBlock> freeCf = new CompletableFuture<>();
private final AtomicInteger unreadCnt = new AtomicInteger(UNREAD_INIT);
private ObjectReader.DataBlockGroup dataBlockGroup;
private long lastAccessTimestamp;

private final ReadStatusChangeListener listener;

private final CompletableFuture<DataBlock> freeCf = new CompletableFuture<>();
final List<FreeListener> freeListeners = new ArrayList<>();

private final Time time;

public DataBlock(long objectId, DataBlockIndex dataBlockIndex, ReadStatusChangeListener observeListener, Time time) {
public DataBlock(long objectId, DataBlockIndex dataBlockIndex, ReadStatusChangeListener observeListener,
Time time) {
this.objectId = objectId;
this.dataBlockIndex = dataBlockIndex;
this.listener = observeListener;
Expand All @@ -58,7 +65,7 @@ public void complete(ObjectReader.DataBlockGroup dataBlockGroup) {
*/
public void completeExceptionally(Throwable ex) {
loadCf.completeExceptionally(ex);
freeCf.complete(null);
free0();
}

public CompletableFuture<DataBlock> dataFuture() {
Expand All @@ -67,13 +74,36 @@ public CompletableFuture<DataBlock> dataFuture() {

public void free() {
release();
free0();
}

private void free0() {
freeCf.complete(this);
for (FreeListener listener : freeListeners) {
try {
listener.onFree(this);
} catch (Throwable e) {
LOGGER.error("invoke onFree fail", e);
}
}
freeListeners.clear();
}

public CompletableFuture<DataBlock> freeFuture() {
return freeCf;
}

public FreeListenerHandle registerFreeListener(FreeListener listener) {
if (freeCf.isDone()) {
listener.onFree(this);
return () -> {
};
} else {
freeListeners.add(listener);
return () -> freeListeners.remove(listener);
}
}

public long objectId() {
return objectId;
}
Expand Down Expand Up @@ -158,4 +188,13 @@ public List<StreamRecordBatch> getRecords(long startOffset, long endOffset, int
public String toString() {
return "DataBlock{" + "objectId=" + objectId + ", index=" + dataBlockIndex + '}';
}

public interface FreeListener {
void onFree(DataBlock dataBlock);
}

public interface FreeListenerHandle {
void close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.LogSuppressor;
import com.automq.stream.utils.threads.EventLoop;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -38,8 +39,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
Expand Down Expand Up @@ -145,7 +144,10 @@ public long lastAccessTimestamp() {

public void close() {
closed = true;
blocksMap.forEach((k, v) -> v.markRead());
List<Block> blocks = new ArrayList<>(blocksMap.values());
// The Block#markRead will immediately invoke after the Block is removed.
blocksMap.clear();
blocks.forEach(Block::markReadCompleted);
}

void read0(ReadContext ctx, final long startOffset, final long endOffset, final int maxBytes) {
Expand Down Expand Up @@ -226,7 +228,8 @@ void read0(ReadContext ctx, final long startOffset, final long endOffset, final
* This method is only for unit testing.
* AfterReadTryReadaheadCf is empty, the task has been completed
* AfterReadTryReadaheadCf is not empty, the task may be completed
* @return afterReadTryReadaheadCf
*
* @return afterReadTryReadaheadCf
*/
@VisibleForTesting
CompletableFuture<Void> getAfterReadTryReadaheadCf() {
Expand All @@ -237,7 +240,8 @@ CompletableFuture<Void> getAfterReadTryReadaheadCf() {
* This method is only for unit testing.
* inflightReadaheadCf is empty, the task has been completed
* inflightReadaheadCf is not empty, the task may be completed
* @return readahead.inflightReadaheadCf
*
* @return readahead.inflightReadaheadCf
*/
@VisibleForTesting
CompletableFuture<Void> getReadaheadInflightReadaheadCf() {
Expand All @@ -255,16 +259,13 @@ void afterRead(ReadDataBlock readDataBlock, ReadContext ctx) {
Block block = it.next().getValue();
if (block.index.endOffset() <= nextReadOffset) {
it.remove();
block.markReadCompleted();
} else {
break;
}
}
// #getDataBlock will invoke DataBlock#markUnread
for (Block block : ctx.blocks) {
block.release();
if (block.index.endOffset() <= nextReadOffset) {
block.markRead();
}
}
// try readahead to speed up the next read
afterReadTryReadaheadCf = eventLoop.submit(() -> readahead.tryReadahead(readDataBlock.getCacheAccessType() == BLOCK_CACHE_MISS));
Expand Down Expand Up @@ -458,8 +459,10 @@ private void handleBlockFree(Block block) {
}

private void resetBlocks() {
blocksMap.forEach((k, v) -> v.markRead());
List<Block> blocks = new ArrayList<>(blocksMap.values());
// The Block#markRead will immediately invoke after the Block is removed.
blocksMap.clear();
blocks.forEach(Block::markReadCompleted);
lastBlock = null;
loadedBlockIndexEndOffset = 0L;
blocksEpoch++;
Expand Down Expand Up @@ -519,28 +522,35 @@ class Block {
final S3ObjectMetadata metadata;
final DataBlockIndex index;
DataBlock data;
DataBlock.FreeListenerHandle freeListenerHandle;

CompletableFuture<Void> loadCf;
Throwable exception;
boolean released = false;
boolean readCompleted = false;

public Block(S3ObjectMetadata metadata, DataBlockIndex index) {
this.metadata = metadata;
this.index = index;
}

// TODO: use different Block type, cause of the returned Block shouldn't have markReadCompleted method
public Block newBlockWithData(boolean readahead) {
// We need to create a new block with consistent data to avoid duplicated release or leak,
// cause of the loaded data maybe evicted and reloaded.
Block newBlock = new Block(metadata, index);
ObjectReader objectReader = objectReaderFactory.get(metadata);
DataBlockCache.GetOptions getOptions = DataBlockCache.GetOptions.builder().readahead(readahead).build();
loadCf = dataBlockCache.getBlock(getOptions, objectReader, index).thenAccept(db -> {
newBlock.data = db;
if (data != db) {
loadCf = dataBlockCache.getBlock(getOptions, objectReader, index).thenAccept(newData -> {
newBlock.data = newData;
if (!readCompleted && data != newData) {
// the data block is first loaded or evict & reload
data = db;
db.markUnread();
data.freeFuture().whenComplete((nil, ex) -> handleBlockFree(this));
if (data != null) {
freeListenerHandle.close();
}
data = newData;
newData.markUnread();
freeListenerHandle = data.registerFreeListener(b -> handleBlockFree(this));
}
}).exceptionally(ex -> {
exception = ex;
Expand All @@ -563,9 +573,14 @@ public void release() {
});
}

public void markRead() {
/**
* The <code>Block#markReadCompleted</code> should be invoked after the Block was removed from <code>blocksMap</code>.
*/
public void markReadCompleted() {
readCompleted = true;
if (data != null) {
data.markRead();
freeListenerHandle.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,13 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
.whenComplete((rst, ex) -> {
if (ex != null) {
LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail", streamId, startOffset, endOffset, maxBytes, ex);
finalStreamReader.close();
} else {
// when two stream read progress is the same, only one stream reader can be retained
streamReaders.put(new StreamReaderKey(streamId, finalStreamReader.nextReadOffset()), finalStreamReader);
StreamReader oldStreamReader = streamReaders.put(new StreamReaderKey(streamId, finalStreamReader.nextReadOffset()), finalStreamReader);
if (oldStreamReader != null) {
oldStreamReader.close();
}
}
});
FutureUtil.propagate(streamReadCf, cf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.threads.EventLoop;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -31,6 +32,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -220,6 +222,11 @@ public void testRead_withReadahead() throws ExecutionException, InterruptedExcep
}).when(objectManager).isObjectExist(anyLong());
eventLoops[0].submit(() -> readCf.set(streamReader.read(14L, 15L, Integer.MAX_VALUE))).get();

// verify blocks free listener
List<StreamReader.Block> blocks = new ArrayList<>(streamReader.blocksMap.values());
blocks.forEach(b -> Assertions.assertFalse(b.data.freeListeners.isEmpty()));
eventLoops[0].submit(() -> streamReader.close());
blocks.forEach(b -> Assertions.assertTrue(b.data.freeListeners.isEmpty()));
}

public void waitForStreamReaderUpdate() throws ExecutionException, InterruptedException {
Expand Down

0 comments on commit 5095dbe

Please sign in to comment.