Skip to content

Commit

Permalink
feat(issues1087): add cold down time to reset readahead (#1123)
Browse files Browse the repository at this point in the history
* feat(issues1087): add cold down time to reset readahead

Signed-off-by: Robin Han <[email protected]>

* feat(issues1087): change MAX_MERGE_READ_SIZE to 4MB

Signed-off-by: Robin Han <[email protected]>

---------

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Apr 15, 2024
1 parent 973138c commit 7b9b846
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.stream.InRangeObjects;
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectState;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.slf4j.Logger;
Expand Down Expand Up @@ -186,7 +187,11 @@ public long endOffset() {
}

public boolean isObjectExist(long objectId) {
return objectsImage.getObjectMetadata(objectId) != null;
S3Object object = objectsImage.getObjectMetadata(objectId);
if (object == null) {
return false;
}
return object.getS3ObjectState() == S3ObjectState.COMMITTED;
}

// must access thread safe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ public synchronized Map.Entry<K, V> pop() {
return entry;
}

public synchronized Map.Entry<K, V> peek() {
Iterator<Map.Entry<K, V>> it = cacheEntrySet.iterator();
if (it.hasNext()) {
return it.next();
}
return null;
}

public synchronized boolean remove(K key) {
return cache.remove(key) != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.slf4j.Logger;
Expand All @@ -47,6 +48,7 @@ public class StreamReader {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamReader.class);
private static final int DEFAULT_READAHEAD_SIZE = 1024 * 1024 / 2;
private static final int MAX_READAHEAD_SIZE = 32 * 1024 * 1024;
private static final long READAHEAD_RESET_COLD_DOWN_MILLS = TimeUnit.MINUTES.toMillis(1);
private static final LogSuppressor LOG_SUPPRESSOR = new LogSuppressor(LOGGER, 30000);
// visible to test
final NavigableMap<Long, Block> blocksMap = new TreeMap<>();
Expand Down Expand Up @@ -425,13 +427,18 @@ class Readahead {
long nextReadaheadOffset;
int nextReadaheadSize = DEFAULT_READAHEAD_SIZE;
long readaheadMarkOffset;
long resetTimestamp;
boolean requireReset;
private CompletableFuture<Void> inflightReadaheadCf;

public void tryReadahead() {
if (inflightReadaheadCf != null) {
return;
}
if (System.currentTimeMillis() - resetTimestamp < READAHEAD_RESET_COLD_DOWN_MILLS) {
// skip readahead when readahead is in cold down
return;
}
if (requireReset) {
nextReadaheadOffset = 0L;
nextReadaheadSize = DEFAULT_READAHEAD_SIZE;
Expand Down Expand Up @@ -460,6 +467,7 @@ public void tryReadahead() {

public void reset() {
requireReset = true;
resetTimestamp = System.currentTimeMillis();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ <T> CompletableFuture<T> acquireWritePermit(CompletableFuture<T> cf) {
}

static class MergedReadTask {
static final int MAX_MERGE_READ_SIZE = 32 * 1024 * 1024;
static final int MAX_MERGE_READ_SIZE = 4 * 1024 * 1024;
final String path;
final List<ReadTask> readTasks = new ArrayList<>();
long start;
Expand Down

0 comments on commit 7b9b846

Please sign in to comment.