diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index a024168c10..1983add90e 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -146,7 +146,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset for (; streamSetObjectIndex < streamSetObjects.size(); streamSetObjectIndex++) { S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex); - StreamOffsetRange streamOffsetRange = search(streamSetObject.offsetRangeList(), streamId); + StreamOffsetRange streamOffsetRange = streamSetObject.find(streamId).orElse(null); // skip the stream set object not containing the stream or the range is before the nextStartOffset if (streamOffsetRange == null || streamOffsetRange.endOffset() <= nextStartOffset) { continue; @@ -251,14 +251,6 @@ public String toString() { return "S3StreamsMetadataImage{nextAssignedStreamId=" + nextAssignedStreamId + '}'; } - public static StreamOffsetRange search(List ranges, long streamId) { - int index = new StreamOffsetRanges(ranges).search(streamId); - if (index < 0) { - return null; - } - return ranges.get(index); - } - static class StreamOffsetRanges extends AbstractOrderedCollection { private final List ranges; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java index 718b2cd466..387cc8fff7 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java @@ -26,22 +26,29 @@ import com.google.common.cache.Weigher; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import org.apache.kafka.common.metadata.S3StreamSetObjectRecord; -import org.apache.kafka.server.common.ApiMessageAndVersion; - import java.time.Duration; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ExecutionException; +import org.apache.kafka.common.metadata.S3StreamSetObjectRecord; +import org.apache.kafka.server.common.ApiMessageAndVersion; public class S3StreamSetObject implements Comparable { private static final Cache> RANGES_CACHE = CacheBuilder.newBuilder() - .expireAfterAccess(Duration.ofMinutes(1)) - .maximumWeight(2500000) // expected max heap occupied size is 75MiB - .weigher((Weigher>) (key, value) -> value.size()) - .build(); + .expireAfterAccess(Duration.ofMinutes(1)) + .maximumWeight(500000) // expected max heap occupied size is 15MiB + .weigher((Weigher>) (key, value) -> value.size()) + .build(); + private static final Cache> RANGE_MAP_CACHE = CacheBuilder.newBuilder() + .expireAfterAccess(Duration.ofMinutes(1)) + .maximumWeight(2500000) // expected max heap occupied size is 75MiB + .weigher((Weigher>) (key, value) -> value.size()) + .build(); public static final byte MAGIC = 0x01; public static final byte ZSTD_COMPRESSED = 1 << 1; private static final int COMPRESSION_THRESHOLD = 50; @@ -60,11 +67,13 @@ public class S3StreamSetObject implements Comparable { private final long dataTimeInMs; // Only used for testing - public S3StreamSetObject(long objectId, int nodeId, final List streamOffsetRanges, long orderId) { + public S3StreamSetObject(long objectId, int nodeId, final List streamOffsetRanges, + long orderId) { this(objectId, nodeId, sortAndEncode(objectId, streamOffsetRanges), orderId, S3StreamConstant.INVALID_TS); } - public S3StreamSetObject(long objectId, int nodeId, final List streamOffsetRanges, long orderId, long dateTimeInMs) { + public S3StreamSetObject(long objectId, int nodeId, final List streamOffsetRanges, long orderId, + long dateTimeInMs) { this(objectId, nodeId, sortAndEncode(objectId, streamOffsetRanges), orderId, dateTimeInMs); } @@ -84,18 +93,33 @@ public List offsetRangeList() { } } + public Optional find(long streamId) { + try { + return Optional.ofNullable(RANGE_MAP_CACHE.get(objectId, () -> { + Map rangeMap = new HashMap<>(); + List rangeList = decode(ranges); + for (StreamOffsetRange range : rangeList) { + rangeMap.put(range.streamId(), range); + } + return rangeMap; + }).get(streamId)); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + public ApiMessageAndVersion toRecord() { return new ApiMessageAndVersion(new S3StreamSetObjectRecord() - .setObjectId(objectId) - .setNodeId(nodeId) - .setOrderId(orderId) - .setDataTimeInMs(dataTimeInMs) - .setRanges(ranges), (short) 0); + .setObjectId(objectId) + .setNodeId(nodeId) + .setOrderId(orderId) + .setDataTimeInMs(dataTimeInMs) + .setRanges(ranges), (short) 0); } public static S3StreamSetObject of(S3StreamSetObjectRecord record) { return new S3StreamSetObject(record.objectId(), record.nodeId(), - record.ranges(), record.orderId(), record.dataTimeInMs()); + record.ranges(), record.orderId(), record.dataTimeInMs()); } public Integer nodeId() { @@ -138,11 +162,11 @@ public int hashCode() { @Override public String toString() { return "S3StreamSetObject{" + - "objectId=" + objectId + - ", orderId=" + orderId + - ", nodeId=" + nodeId + - ", dataTimeInMs=" + dataTimeInMs + - '}'; + "objectId=" + objectId + + ", orderId=" + orderId + + ", nodeId=" + nodeId + + ", dataTimeInMs=" + dataTimeInMs + + '}'; } @Override @@ -210,4 +234,10 @@ public static List decode(byte[] bytes) { } return ranges; } + + // Only used for testing + public static void cleanCache() { + RANGES_CACHE.invalidateAll(); + RANGE_MAP_CACHE.invalidateAll(); + } } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 0bb7c1f66c..6d18692bf3 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3StreamSetObject; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -68,6 +69,11 @@ public class S3StreamsMetadataImageTest { IMAGE2 = S3StreamsMetadataImage.EMPTY; } + @AfterEach + public void cleanup() { + S3StreamSetObject.cleanCache(); + } + @Test public void testAssignedChange() { S3StreamsMetadataImage image0 = S3StreamsMetadataImage.EMPTY;