From 5095dbe505a9bf48ff80d081430bb3049e0b8b2e Mon Sep 17 00:00:00 2001 From: Robin Han Date: Wed, 14 Aug 2024 20:41:21 +0800 Subject: [PATCH] fix(issues1798): fix stream reader memory leak Signed-off-by: Robin Han --- .../stream/s3/cache/blockcache/DataBlock.java | 45 +++++++++++++++-- .../s3/cache/blockcache/StreamReader.java | 49 ++++++++++++------- .../s3/cache/blockcache/StreamReaders.java | 6 ++- .../s3/cache/blockcache/StreamReaderTest.java | 7 +++ 4 files changed, 86 insertions(+), 21 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java index 717c7fec30..e54d1be4e7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java @@ -23,21 +23,28 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @EventLoopSafe public class DataBlock extends AbstractReferenceCounted { + private static final Logger LOGGER = LoggerFactory.getLogger(DataBlock.class); private static final int UNREAD_INIT = -1; private final long objectId; private final DataBlockIndex dataBlockIndex; private final CompletableFuture loadCf = new CompletableFuture<>(); - private final CompletableFuture freeCf = new CompletableFuture<>(); private final AtomicInteger unreadCnt = new AtomicInteger(UNREAD_INIT); private ObjectReader.DataBlockGroup dataBlockGroup; private long lastAccessTimestamp; private final ReadStatusChangeListener listener; + + private final CompletableFuture freeCf = new CompletableFuture<>(); + final List freeListeners = new ArrayList<>(); + private final Time time; - public DataBlock(long objectId, DataBlockIndex dataBlockIndex, ReadStatusChangeListener observeListener, Time time) { + public DataBlock(long objectId, DataBlockIndex dataBlockIndex, ReadStatusChangeListener observeListener, + Time time) { this.objectId = objectId; this.dataBlockIndex = dataBlockIndex; this.listener = observeListener; @@ -58,7 +65,7 @@ public void complete(ObjectReader.DataBlockGroup dataBlockGroup) { */ public void completeExceptionally(Throwable ex) { loadCf.completeExceptionally(ex); - freeCf.complete(null); + free0(); } public CompletableFuture dataFuture() { @@ -67,13 +74,36 @@ public CompletableFuture dataFuture() { public void free() { release(); + free0(); + } + + private void free0() { freeCf.complete(this); + for (FreeListener listener : freeListeners) { + try { + listener.onFree(this); + } catch (Throwable e) { + LOGGER.error("invoke onFree fail", e); + } + } + freeListeners.clear(); } public CompletableFuture freeFuture() { return freeCf; } + public FreeListenerHandle registerFreeListener(FreeListener listener) { + if (freeCf.isDone()) { + listener.onFree(this); + return () -> { + }; + } else { + freeListeners.add(listener); + return () -> freeListeners.remove(listener); + } + } + public long objectId() { return objectId; } @@ -158,4 +188,13 @@ public List getRecords(long startOffset, long endOffset, int public String toString() { return "DataBlock{" + "objectId=" + objectId + ", index=" + dataBlockIndex + '}'; } + + public interface FreeListener { + void onFree(DataBlock dataBlock); + } + + public interface FreeListenerHandle { + void close(); + } + } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java index 638df26fb4..438a2dd77a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java @@ -27,6 +27,7 @@ import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.LogSuppressor; import com.automq.stream.utils.threads.EventLoop; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -38,8 +39,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; @@ -145,7 +144,10 @@ public long lastAccessTimestamp() { public void close() { closed = true; - blocksMap.forEach((k, v) -> v.markRead()); + List blocks = new ArrayList<>(blocksMap.values()); + // The Block#markRead will immediately invoke after the Block is removed. + blocksMap.clear(); + blocks.forEach(Block::markReadCompleted); } void read0(ReadContext ctx, final long startOffset, final long endOffset, final int maxBytes) { @@ -226,7 +228,8 @@ void read0(ReadContext ctx, final long startOffset, final long endOffset, final * This method is only for unit testing. * AfterReadTryReadaheadCf is empty, the task has been completed * AfterReadTryReadaheadCf is not empty, the task may be completed - * @return afterReadTryReadaheadCf + * + * @return afterReadTryReadaheadCf */ @VisibleForTesting CompletableFuture getAfterReadTryReadaheadCf() { @@ -237,7 +240,8 @@ CompletableFuture getAfterReadTryReadaheadCf() { * This method is only for unit testing. * inflightReadaheadCf is empty, the task has been completed * inflightReadaheadCf is not empty, the task may be completed - * @return readahead.inflightReadaheadCf + * + * @return readahead.inflightReadaheadCf */ @VisibleForTesting CompletableFuture getReadaheadInflightReadaheadCf() { @@ -255,16 +259,13 @@ void afterRead(ReadDataBlock readDataBlock, ReadContext ctx) { Block block = it.next().getValue(); if (block.index.endOffset() <= nextReadOffset) { it.remove(); + block.markReadCompleted(); } else { break; } } - // #getDataBlock will invoke DataBlock#markUnread for (Block block : ctx.blocks) { block.release(); - if (block.index.endOffset() <= nextReadOffset) { - block.markRead(); - } } // try readahead to speed up the next read afterReadTryReadaheadCf = eventLoop.submit(() -> readahead.tryReadahead(readDataBlock.getCacheAccessType() == BLOCK_CACHE_MISS)); @@ -458,8 +459,10 @@ private void handleBlockFree(Block block) { } private void resetBlocks() { - blocksMap.forEach((k, v) -> v.markRead()); + List blocks = new ArrayList<>(blocksMap.values()); + // The Block#markRead will immediately invoke after the Block is removed. blocksMap.clear(); + blocks.forEach(Block::markReadCompleted); lastBlock = null; loadedBlockIndexEndOffset = 0L; blocksEpoch++; @@ -519,28 +522,35 @@ class Block { final S3ObjectMetadata metadata; final DataBlockIndex index; DataBlock data; + DataBlock.FreeListenerHandle freeListenerHandle; + CompletableFuture loadCf; Throwable exception; boolean released = false; + boolean readCompleted = false; public Block(S3ObjectMetadata metadata, DataBlockIndex index) { this.metadata = metadata; this.index = index; } + // TODO: use different Block type, cause of the returned Block shouldn't have markReadCompleted method public Block newBlockWithData(boolean readahead) { // We need to create a new block with consistent data to avoid duplicated release or leak, // cause of the loaded data maybe evicted and reloaded. Block newBlock = new Block(metadata, index); ObjectReader objectReader = objectReaderFactory.get(metadata); DataBlockCache.GetOptions getOptions = DataBlockCache.GetOptions.builder().readahead(readahead).build(); - loadCf = dataBlockCache.getBlock(getOptions, objectReader, index).thenAccept(db -> { - newBlock.data = db; - if (data != db) { + loadCf = dataBlockCache.getBlock(getOptions, objectReader, index).thenAccept(newData -> { + newBlock.data = newData; + if (!readCompleted && data != newData) { // the data block is first loaded or evict & reload - data = db; - db.markUnread(); - data.freeFuture().whenComplete((nil, ex) -> handleBlockFree(this)); + if (data != null) { + freeListenerHandle.close(); + } + data = newData; + newData.markUnread(); + freeListenerHandle = data.registerFreeListener(b -> handleBlockFree(this)); } }).exceptionally(ex -> { exception = ex; @@ -563,9 +573,14 @@ public void release() { }); } - public void markRead() { + /** + * The Block#markReadCompleted should be invoked after the Block was removed from blocksMap. + */ + public void markReadCompleted() { + readCompleted = true; if (data != null) { data.markRead(); + freeListenerHandle.close(); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java index abeac076b5..cf064887d4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java @@ -126,9 +126,13 @@ public CompletableFuture read(long streamId, long startOffset, .whenComplete((rst, ex) -> { if (ex != null) { LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail", streamId, startOffset, endOffset, maxBytes, ex); + finalStreamReader.close(); } else { // when two stream read progress is the same, only one stream reader can be retained - streamReaders.put(new StreamReaderKey(streamId, finalStreamReader.nextReadOffset()), finalStreamReader); + StreamReader oldStreamReader = streamReaders.put(new StreamReaderKey(streamId, finalStreamReader.nextReadOffset()), finalStreamReader); + if (oldStreamReader != null) { + oldStreamReader.close(); + } } }); FutureUtil.propagate(streamReadCf, cf); diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReaderTest.java index ee2373fd8f..cba4fc211f 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/blockcache/StreamReaderTest.java @@ -22,6 +22,7 @@ import com.automq.stream.s3.operator.ObjectStorage; import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.threads.EventLoop; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,6 +32,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -220,6 +222,11 @@ public void testRead_withReadahead() throws ExecutionException, InterruptedExcep }).when(objectManager).isObjectExist(anyLong()); eventLoops[0].submit(() -> readCf.set(streamReader.read(14L, 15L, Integer.MAX_VALUE))).get(); + // verify blocks free listener + List blocks = new ArrayList<>(streamReader.blocksMap.values()); + blocks.forEach(b -> Assertions.assertFalse(b.data.freeListeners.isEmpty())); + eventLoops[0].submit(() -> streamReader.close()); + blocks.forEach(b -> Assertions.assertTrue(b.data.freeListeners.isEmpty())); } public void waitForStreamReaderUpdate() throws ExecutionException, InterruptedException {