Skip to content

Commit

Permalink
feat(s3stream): speed up block cache get objects (#1167)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Apr 25, 2024
1 parent 81e4e6d commit 30a24c8
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset

for (; streamSetObjectIndex < streamSetObjects.size(); streamSetObjectIndex++) {
S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex);
StreamOffsetRange streamOffsetRange = search(streamSetObject.offsetRangeList(), streamId);
StreamOffsetRange streamOffsetRange = streamSetObject.find(streamId).orElse(null);
// skip the stream set object not containing the stream or the range is before the nextStartOffset
if (streamOffsetRange == null || streamOffsetRange.endOffset() <= nextStartOffset) {
continue;
Expand Down Expand Up @@ -251,14 +251,6 @@ public String toString() {
return "S3StreamsMetadataImage{nextAssignedStreamId=" + nextAssignedStreamId + '}';
}

public static StreamOffsetRange search(List<StreamOffsetRange> ranges, long streamId) {
int index = new StreamOffsetRanges(ranges).search(streamId);
if (index < 0) {
return null;
}
return ranges.get(index);
}

static class StreamOffsetRanges extends AbstractOrderedCollection<Long> {
private final List<StreamOffsetRange> ranges;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,29 @@
import com.google.common.cache.Weigher;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.kafka.common.metadata.S3StreamSetObjectRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.metadata.S3StreamSetObjectRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;

public class S3StreamSetObject implements Comparable<S3StreamSetObject> {
private static final Cache<Long, List<StreamOffsetRange>> RANGES_CACHE = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(1))
.maximumWeight(2500000) // expected max heap occupied size is 75MiB
.weigher((Weigher<Long, List<StreamOffsetRange>>) (key, value) -> value.size())
.build();
.expireAfterAccess(Duration.ofMinutes(1))
.maximumWeight(500000) // expected max heap occupied size is 15MiB
.weigher((Weigher<Long, List<StreamOffsetRange>>) (key, value) -> value.size())
.build();
private static final Cache<Long, Map<Long, StreamOffsetRange>> RANGE_MAP_CACHE = CacheBuilder.newBuilder()
.expireAfterAccess(Duration.ofMinutes(1))
.maximumWeight(2500000) // expected max heap occupied size is 75MiB
.weigher((Weigher<Long, Map<Long, StreamOffsetRange>>) (key, value) -> value.size())
.build();
public static final byte MAGIC = 0x01;
public static final byte ZSTD_COMPRESSED = 1 << 1;
private static final int COMPRESSION_THRESHOLD = 50;
Expand All @@ -60,11 +67,13 @@ public class S3StreamSetObject implements Comparable<S3StreamSetObject> {
private final long dataTimeInMs;

// Only used for testing
public S3StreamSetObject(long objectId, int nodeId, final List<StreamOffsetRange> streamOffsetRanges, long orderId) {
public S3StreamSetObject(long objectId, int nodeId, final List<StreamOffsetRange> streamOffsetRanges,
long orderId) {
this(objectId, nodeId, sortAndEncode(objectId, streamOffsetRanges), orderId, S3StreamConstant.INVALID_TS);
}

public S3StreamSetObject(long objectId, int nodeId, final List<StreamOffsetRange> streamOffsetRanges, long orderId, long dateTimeInMs) {
public S3StreamSetObject(long objectId, int nodeId, final List<StreamOffsetRange> streamOffsetRanges, long orderId,
long dateTimeInMs) {
this(objectId, nodeId, sortAndEncode(objectId, streamOffsetRanges), orderId, dateTimeInMs);
}

Expand All @@ -84,18 +93,33 @@ public List<StreamOffsetRange> offsetRangeList() {
}
}

public Optional<StreamOffsetRange> find(long streamId) {
try {
return Optional.ofNullable(RANGE_MAP_CACHE.get(objectId, () -> {
Map<Long, StreamOffsetRange> rangeMap = new HashMap<>();
List<StreamOffsetRange> rangeList = decode(ranges);
for (StreamOffsetRange range : rangeList) {
rangeMap.put(range.streamId(), range);
}
return rangeMap;
}).get(streamId));
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(new S3StreamSetObjectRecord()
.setObjectId(objectId)
.setNodeId(nodeId)
.setOrderId(orderId)
.setDataTimeInMs(dataTimeInMs)
.setRanges(ranges), (short) 0);
.setObjectId(objectId)
.setNodeId(nodeId)
.setOrderId(orderId)
.setDataTimeInMs(dataTimeInMs)
.setRanges(ranges), (short) 0);
}

public static S3StreamSetObject of(S3StreamSetObjectRecord record) {
return new S3StreamSetObject(record.objectId(), record.nodeId(),
record.ranges(), record.orderId(), record.dataTimeInMs());
record.ranges(), record.orderId(), record.dataTimeInMs());
}

public Integer nodeId() {
Expand Down Expand Up @@ -138,11 +162,11 @@ public int hashCode() {
@Override
public String toString() {
return "S3StreamSetObject{" +
"objectId=" + objectId +
", orderId=" + orderId +
", nodeId=" + nodeId +
", dataTimeInMs=" + dataTimeInMs +
'}';
"objectId=" + objectId +
", orderId=" + orderId +
", nodeId=" + nodeId +
", dataTimeInMs=" + dataTimeInMs +
'}';
}

@Override
Expand Down Expand Up @@ -210,4 +234,10 @@ public static List<StreamOffsetRange> decode(byte[] bytes) {
}
return ranges;
}

// Only used for testing
public static void cleanCache() {
RANGES_CACHE.invalidateAll();
RANGE_MAP_CACHE.invalidateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3StreamSetObject;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -68,6 +69,11 @@ public class S3StreamsMetadataImageTest {
IMAGE2 = S3StreamsMetadataImage.EMPTY;
}

@AfterEach
public void cleanup() {
S3StreamSetObject.cleanCache();
}

@Test
public void testAssignedChange() {
S3StreamsMetadataImage image0 = S3StreamsMetadataImage.EMPTY;
Expand Down

0 comments on commit 30a24c8

Please sign in to comment.