From 4703fcc28b619b5d594aa751bb60e5bda3998c3c Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 15 Aug 2024 11:01:14 +0800 Subject: [PATCH] fix(issues1798): force expired data block after create timestamp exceed FORCE_EXPIRED_DELTA_MILLS Signed-off-by: Robin Han --- .../com/automq/stream/s3/cache/blockcache/DataBlock.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java index f077c5e1ef..c686351765 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java @@ -22,25 +22,30 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @EventLoopSafe public class DataBlock extends AbstractReferenceCounted { private static final int UNREAD_INIT = -1; + private static final long FORCE_EXPIRED_DELTA_MILLS = TimeUnit.MINUTES.toMillis(4); private final long objectId; private final DataBlockIndex dataBlockIndex; private final CompletableFuture loadCf = new CompletableFuture<>(); private final CompletableFuture freeCf = new CompletableFuture<>(); private final AtomicInteger unreadCnt = new AtomicInteger(UNREAD_INIT); private ObjectReader.DataBlockGroup dataBlockGroup; + private final long createTimestamp; private long lastAccessTimestamp; private final ReadStatusChangeListener listener; private final Time time; - public DataBlock(long objectId, DataBlockIndex dataBlockIndex, ReadStatusChangeListener observeListener, Time time) { + public DataBlock(long objectId, DataBlockIndex dataBlockIndex, ReadStatusChangeListener observeListener, + Time time) { this.objectId = objectId; this.dataBlockIndex = dataBlockIndex; this.listener = observeListener; + this.createTimestamp = time.milliseconds(); this.lastAccessTimestamp = time.milliseconds(); this.time = time; } @@ -109,7 +114,7 @@ public void markRead() { } public boolean isExpired(long expiredTimestamp) { - return lastAccessTimestamp < expiredTimestamp; + return lastAccessTimestamp < expiredTimestamp || (createTimestamp + FORCE_EXPIRED_DELTA_MILLS) < expiredTimestamp; } @Override