diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java index f077c5e1ef..c686351765 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlock.java @@ -22,25 +22,30 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @EventLoopSafe public class DataBlock extends AbstractReferenceCounted { private static final int UNREAD_INIT = -1; + private static final long FORCE_EXPIRED_DELTA_MILLS = TimeUnit.MINUTES.toMillis(4); private final long objectId; private final DataBlockIndex dataBlockIndex; private final CompletableFuture loadCf = new CompletableFuture<>(); private final CompletableFuture freeCf = new CompletableFuture<>(); private final AtomicInteger unreadCnt = new AtomicInteger(UNREAD_INIT); private ObjectReader.DataBlockGroup dataBlockGroup; + private final long createTimestamp; private long lastAccessTimestamp; private final ReadStatusChangeListener listener; private final Time time; - public DataBlock(long objectId, DataBlockIndex dataBlockIndex, ReadStatusChangeListener observeListener, Time time) { + public DataBlock(long objectId, DataBlockIndex dataBlockIndex, ReadStatusChangeListener observeListener, + Time time) { this.objectId = objectId; this.dataBlockIndex = dataBlockIndex; this.listener = observeListener; + this.createTimestamp = time.milliseconds(); this.lastAccessTimestamp = time.milliseconds(); this.time = time; } @@ -109,7 +114,7 @@ public void markRead() { } public boolean isExpired(long expiredTimestamp) { - return lastAccessTimestamp < expiredTimestamp; + return lastAccessTimestamp < expiredTimestamp || (createTimestamp + FORCE_EXPIRED_DELTA_MILLS) < expiredTimestamp; } @Override