diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 2265f4606c..57e8918131 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -183,13 +183,13 @@ public CompletableFuture readNodeRangeIndex(long nodeId) { Map streamRangeIndexMap; if (nodeId == BROKER0) { streamRangeIndexMap = Map.of( - STREAM0, new SparseRangeIndex(2, 1, List.of( + STREAM0, new SparseRangeIndex(2, List.of( new RangeIndex(100, 120, 0), new RangeIndex(180, 200, 2), new RangeIndex(520, 600, 4)))); } else { streamRangeIndexMap = Map.of( - STREAM0, new SparseRangeIndex(2, 1, List.of( + STREAM0, new SparseRangeIndex(2, List.of( new RangeIndex(140, 160, 5), new RangeIndex(420, 520, 7), // objectId 8 is not exist (compacted) diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index ee4d72df89..f44a4845db 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -373,7 +373,7 @@ private void compactObjects(List streamMetadataList, List streamRangeIndexMap = new HashMap<>(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); @@ -55,24 +55,22 @@ public class LocalStreamRangeIndexCache implements S3StreamClient.StreamLifeCycl private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("upload-index", true)); private final Queue> uploadQueue = new LinkedList<>(); + private final CompletableFuture initCf = new CompletableFuture<>(); private long nodeId = -1; private ObjectStorage objectStorage; - private CompletableFuture initCf = new CompletableFuture<>(); + private int totalSize = 0; public void start() { executorService.scheduleAtFixedRate(this::batchUpload, 0, 10, TimeUnit.MILLISECONDS); executorService.scheduleAtFixedRate(this::flush, 1, 1, TimeUnit.MINUTES); } - // for test - void reset() { - writeLock.lock(); - try { - streamRangeIndexMap.clear(); - initCf = new CompletableFuture<>(); - } finally { - writeLock.unlock(); - } + public int totalSize() { + return totalSize; + } + + CompletableFuture initCf() { + return initCf; } // test only @@ -158,7 +156,8 @@ public void init(int nodeId, ObjectStorage objectStorage) { writeLock.lock(); try { for (Map.Entry> entry : LocalStreamRangeIndexCache.fromBuffer(data).entrySet()) { - this.streamRangeIndexMap.put(entry.getKey(), new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING, entry.getValue())); + this.streamRangeIndexMap.put(entry.getKey(), new SparseRangeIndex(COMPACT_NUM, entry.getValue())); + this.totalSize += entry.getValue().size() * RangeIndex.OBJECT_SIZE; } } finally { writeLock.unlock(); @@ -207,9 +206,10 @@ public CompletableFuture append(Map rangeIndexMap) { for (Map.Entry entry : rangeIndexMap.entrySet()) { long streamId = entry.getKey(); RangeIndex rangeIndex = entry.getValue(); - streamRangeIndexMap.computeIfAbsent(streamId, - k -> new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING)).append(rangeIndex); + totalSize += streamRangeIndexMap.computeIfAbsent(streamId, + k -> new SparseRangeIndex(COMPACT_NUM)).append(rangeIndex); } + evictIfNecessary(); } finally { writeLock.unlock(); } @@ -217,6 +217,33 @@ public CompletableFuture append(Map rangeIndexMap) { }); } + private void evictIfNecessary() { + if (totalSize <= MAX_INDEX_SIZE) { + return; + } + boolean evicted = false; + boolean hasSufficientIndex = true; + List streamRangeIndexList = new ArrayList<>(streamRangeIndexMap.values()); + Collections.shuffle(streamRangeIndexList); + while (totalSize > MAX_INDEX_SIZE) { + // try to evict from each stream in round-robin manner + for (SparseRangeIndex sparseRangeIndex : streamRangeIndexList) { + if (sparseRangeIndex.length() <= 1 + COMPACT_NUM && hasSufficientIndex) { + // skip evict if there is still sufficient stream to be evicted + continue; + } + totalSize -= sparseRangeIndex.evictOnce(); + evicted = true; + if (totalSize <= MAX_INDEX_SIZE) { + break; + } + } + if (!evicted) { + hasSufficientIndex = false; + } + } + } + public CompletableFuture compact(Map rangeIndexMap, Set compactedObjectIds) { return exec(() -> { writeLock.lock(); @@ -225,8 +252,8 @@ public CompletableFuture compact(Map rangeIndexMap, Set< Iterator> iterator = streamRangeIndexMap.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); - entry.getValue().compact(null, compactedObjectIds); - if (entry.getValue().size() == 0) { + totalSize += entry.getValue().compact(null, compactedObjectIds); + if (entry.getValue().length() == 0) { iterator.remove(); } } @@ -237,10 +264,10 @@ public CompletableFuture compact(Map rangeIndexMap, Set< RangeIndex rangeIndex = entry.getValue(); streamRangeIndexMap.compute(streamId, (k, v) -> { if (v == null) { - v = new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING); + v = new SparseRangeIndex(COMPACT_NUM); } - v.compact(rangeIndex, compactedObjectIds); - if (v.size() == 0) { + totalSize += v.compact(rangeIndex, compactedObjectIds); + if (v.length() == 0) { // remove stream with empty index return null; } @@ -270,11 +297,7 @@ public CompletableFuture updateIndexFromRequest(CommitStreamSetObjectReque } public static ByteBuf toBuffer(Map streamRangeIndexMap) { - int capacity = Short.BYTES // version - + Integer.BYTES // stream num - + streamRangeIndexMap.values().stream().mapToInt(index -> Long.BYTES // stream id - + Integer.BYTES // range index num - + index.getRangeIndexList().size() * (3 * Long.BYTES)).sum(); + int capacity = bufferSize(streamRangeIndexMap); ByteBuf buffer = ByteBufAlloc.byteBuffer(capacity); try { buffer.writeShort(VERSION); @@ -295,6 +318,14 @@ public static ByteBuf toBuffer(Map streamRangeIndexMap) return buffer; } + private static int bufferSize(Map streamRangeIndexMap) { + return Short.BYTES // version + + Integer.BYTES // stream num + + streamRangeIndexMap.values().stream().mapToInt(index -> Long.BYTES // stream id + + Integer.BYTES // range index num + + index.getRangeIndexList().size() * (3 * Long.BYTES)).sum(); + } + public static Map> fromBuffer(ByteBuf data) { Map> rangeIndexMap = new HashMap<>(); short version = data.readShort(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/index/SparseRangeIndex.java b/s3stream/src/main/java/com/automq/stream/s3/index/SparseRangeIndex.java index d460e2e9dd..1a05393812 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/index/SparseRangeIndex.java +++ b/s3stream/src/main/java/com/automq/stream/s3/index/SparseRangeIndex.java @@ -22,44 +22,64 @@ public class SparseRangeIndex { private static final Logger LOGGER = LoggerFactory.getLogger(SparseRangeIndex.class); private final int compactNum; - private final int sparsePadding; // sorted by startOffset in descending order private List sortedRangeIndexList; - private long evictIndex = 0; + private int size = 0; + private int evictIndex = 0; - public SparseRangeIndex(int compactNum, int sparsePadding) { - this(compactNum, sparsePadding, new ArrayList<>()); + public SparseRangeIndex(int compactNum) { + this(compactNum, new ArrayList<>()); } - public SparseRangeIndex(int compactNum, int sparsePadding, List sortedRangeIndexList) { + public SparseRangeIndex(int compactNum, List sortedRangeIndexList) { this.compactNum = compactNum; - this.sparsePadding = sparsePadding; + init(sortedRangeIndexList); + } + + private void init(List sortedRangeIndexList) { + if (sortedRangeIndexList == null) { + sortedRangeIndexList = new ArrayList<>(); + } this.sortedRangeIndexList = sortedRangeIndexList; + this.size = sortedRangeIndexList.size() * RangeIndex.OBJECT_SIZE; } - public void append(RangeIndex newRangeIndex) { + /** + * Append new range index to the list. + * @param newRangeIndex the range index to append + * @return the change of size after appending + */ + public int append(RangeIndex newRangeIndex) { + int delta = 0; if (newRangeIndex == null) { - return; + return delta; } if (!this.sortedRangeIndexList.isEmpty() && newRangeIndex.compareTo(this.sortedRangeIndexList.get(this.sortedRangeIndexList.size() - 1)) <= 0) { LOGGER.error("Unexpected new range index {}, last: {}, maybe initialized with outdated index file, " + "reset local cache", newRangeIndex, this.sortedRangeIndexList.get(this.sortedRangeIndexList.size() - 1)); + delta -= size; reset(); } this.sortedRangeIndexList.add(newRangeIndex); - evict(); + size += RangeIndex.OBJECT_SIZE; + return delta + RangeIndex.OBJECT_SIZE; } public void reset() { - this.sortedRangeIndexList.clear(); - evictIndex = 0; + init(new ArrayList<>()); } - public void compact(RangeIndex newRangeIndex, Set compactedObjectIds) { + /** + * Compact the list by removing the compacted object ids and add the new range index if not null. + * + * @param newRangeIndex the new range index to add + * @param compactedObjectIds the object ids to compact + * @return the change of size after compacting + */ + public int compact(RangeIndex newRangeIndex, Set compactedObjectIds) { if (compactedObjectIds.isEmpty()) { - append(newRangeIndex); - return; + return append(newRangeIndex); } List newRangeIndexList = new ArrayList<>(); boolean found = false; @@ -68,29 +88,70 @@ public void compact(RangeIndex newRangeIndex, Set compactedObjectIds) { continue; } if (newRangeIndex != null && !found && rangeIndex.compareTo(newRangeIndex) > 0) { + // insert new range index into the list newRangeIndexList.add(newRangeIndex); found = true; } newRangeIndexList.add(rangeIndex); } if (newRangeIndex != null && !found) { + // insert new range index into the end of the list newRangeIndexList.add(newRangeIndex); } - this.sortedRangeIndexList = newRangeIndexList; + int oldSize = size; + init(newRangeIndexList); + return size - oldSize; } - private void evict() { - if (this.sortedRangeIndexList.size() > this.compactNum) { - if (evictIndex++ % (sparsePadding + 1) == 0) { - this.sortedRangeIndexList.remove(this.sortedRangeIndexList.size() - this.compactNum - 1); + /** + * Try to evict one range index from the list, the eviction priority for each element is: + * 1. any element that's not the first and last N compacted elements + * 2. the last N compacted elements + * 3. the first element + *

+ * For example for a list of [0, 1, 2, 3, 4, 5], compact number is 2, the eviction result will be: + *

    + *
  • 1rst: [0, 2, 3, 4, 5]
  • + *
  • 2nd: [0, 2, 4, 5]
  • + *
  • 3rd: [0, 4, 5]
  • + *
  • 4th: [0, 5]
  • + *
  • 5th: [0]
  • + *
  • 6th: []
  • + *
+ * + * @return evicted size + */ + public int evictOnce() { + int indexToEvict = -1; + if (this.sortedRangeIndexList.isEmpty()) { + return 0; + } else if (this.sortedRangeIndexList.size() == 1) { + // evict the only element + indexToEvict = 0; + } else if (this.sortedRangeIndexList.size() <= (1 + compactNum)) { + indexToEvict = 1; + } + + if (indexToEvict == -1) { + if (evictIndex % this.sortedRangeIndexList.size() == 0 + || this.evictIndex >= this.sortedRangeIndexList.size() - compactNum) { + this.evictIndex = 1; } + indexToEvict = evictIndex++; } + this.sortedRangeIndexList.remove(indexToEvict); + size -= RangeIndex.OBJECT_SIZE; + return RangeIndex.OBJECT_SIZE; } - public int size() { + public int length() { return this.sortedRangeIndexList.size(); } + public int size() { + return size; + } + List getRangeIndexList() { return this.sortedRangeIndexList; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java index ac7a3a0e75..17b8270a10 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java @@ -120,7 +120,7 @@ protected void newLargeObjectWriter(WriteOptions writeOptions, AbstractObjectSto } class ObjectWriter implements Writer { - // max upload size, when object data size is larger MAX_UPLOAD_SIZE, we should use multi-part upload to upload it. + // max upload size, when object data size is larger than MAX_UPLOAD_SIZE, we should use multi-part upload to upload it. static final long MAX_UPLOAD_SIZE = 32L * 1024 * 1024; CompletableFuture cf = new CompletableFuture<>(); CompositeByteBuf data = ByteBufAlloc.compositeByteBuffer(); diff --git a/s3stream/src/test/java/com/automq/stream/s3/index/LocalStreamRangeIndexCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/index/LocalStreamRangeIndexCacheTest.java index 1ac6e2b0eb..9efefeca25 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/index/LocalStreamRangeIndexCacheTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/index/LocalStreamRangeIndexCacheTest.java @@ -44,6 +44,8 @@ public void testInit() { cache = new LocalStreamRangeIndexCache(); cache.start(); cache.init(NODE_0, objectStorage); + cache.initCf().join(); + Assertions.assertEquals(RangeIndex.OBJECT_SIZE, cache.totalSize()); Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 0).join()); Assertions.assertEquals(88, cache.searchObjectId(STREAM_0, 50).join()); Assertions.assertEquals(88, cache.searchObjectId(STREAM_0, 70).join()); @@ -64,29 +66,41 @@ public void testAppend() { cache.updateIndexFromRequest(request).join(); startOffset += 100; } - Assertions.assertEquals(7, cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().size()); - Assertions.assertEquals(new RangeIndex(150, 250, 89), - cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().get(0)); - Assertions.assertEquals(new RangeIndex(350, 450, 91), - cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().get(1)); - Assertions.assertEquals(new RangeIndex(550, 650, 93), - cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().get(2)); - Assertions.assertEquals(new RangeIndex(650, 750, 94), - cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().get(3)); - Assertions.assertEquals(new RangeIndex(750, 850, 95), - cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().get(4)); - Assertions.assertEquals(new RangeIndex(850, 950, 96), - cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().get(5)); - Assertions.assertEquals(new RangeIndex(950, 1050, 97), - cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().get(6)); + Assertions.assertEquals(10, cache.getStreamRangeIndexMap().get(STREAM_0).length()); Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 0).join()); - Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 50).join()); + Assertions.assertEquals(88, cache.searchObjectId(STREAM_0, 50).join()); + Assertions.assertEquals(88, cache.searchObjectId(STREAM_0, 100).join()); Assertions.assertEquals(89, cache.searchObjectId(STREAM_0, 150).join()); Assertions.assertEquals(93, cache.searchObjectId(STREAM_0, 600).join()); Assertions.assertEquals(97, cache.searchObjectId(STREAM_0, 950).join()); Assertions.assertEquals(97, cache.searchObjectId(STREAM_0, 1500).join()); } + @Test + public void testEvict() { + ObjectStorage objectStorage = new MemoryObjectStorage(); + LocalStreamRangeIndexCache cache = new LocalStreamRangeIndexCache(); + cache.start(); + cache.init(NODE_0, objectStorage); + int streamNum = 500; + int maxRangeIndexNum = 2000; + MockRandom r = new MockRandom(); + int objectId = 0; + for (int i = 0; i < streamNum; i++) { + int rangeIndexNum = r.nextInt(maxRangeIndexNum); + int startOffset = 0; + for (int j = 0; j < rangeIndexNum; j++) { + CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest(); + request.setObjectId(objectId++); + request.setStreamRanges(List.of(new ObjectStreamRange(i, 0, startOffset, startOffset + 100, 100))); + cache.updateIndexFromRequest(request).join(); + startOffset += 100; + } + } + Assertions.assertEquals(streamNum, cache.getStreamRangeIndexMap().size()); + Assertions.assertTrue(cache.totalSize() <= LocalStreamRangeIndexCache.MAX_INDEX_SIZE); + } + @Test public void testCompact() { ObjectStorage objectStorage = new MemoryObjectStorage(); @@ -101,12 +115,14 @@ public void testCompact() { cache.updateIndexFromRequest(request).join(); startOffset += 100; } - Assertions.assertEquals(7, cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().size()); + Assertions.assertEquals(10, cache.getStreamRangeIndexMap().get(STREAM_0).length()); + Assertions.assertEquals(10 * RangeIndex.OBJECT_SIZE, cache.totalSize()); request.setObjectId(256); request.setStreamRanges(List.of(new ObjectStreamRange(STREAM_0, 0, 50, 650, 1000))); request.setCompactedObjectIds(List.of(88L, 89L, 90L, 91L, 92L, 93L)); cache.updateIndexFromRequest(request).join(); Assertions.assertEquals(5, cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().size()); + Assertions.assertEquals(5 * RangeIndex.OBJECT_SIZE, cache.totalSize()); Assertions.assertEquals(new RangeIndex(50, 650, 256), cache.getStreamRangeIndexMap().get(STREAM_0).getRangeIndexList().get(0)); Assertions.assertEquals(new RangeIndex(650, 750, 94), @@ -129,6 +145,7 @@ public void testCompact() { request.setCompactedObjectIds(List.of(256L, 94L, 95L, 96L, 97L)); cache.updateIndexFromRequest(request).join(); Assertions.assertTrue(cache.getStreamRangeIndexMap().isEmpty()); + Assertions.assertEquals(0, cache.totalSize()); Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 300).join()); } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/index/MockRandom.java b/s3stream/src/test/java/com/automq/stream/s3/index/MockRandom.java new file mode 100644 index 0000000000..f0c1134ed2 --- /dev/null +++ b/s3stream/src/test/java/com/automq/stream/s3/index/MockRandom.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.index; + +import java.util.Random; + +public class MockRandom extends Random { + private long state; + + public MockRandom() { + this(17); + } + + public MockRandom(long state) { + this.state = state; + } + + @Override + protected int next(int bits) { + state = (state * 2862933555777941757L) + 3037000493L; + return (int) (state >>> (64 - bits)); + } +} diff --git a/s3stream/src/test/java/com/automq/stream/s3/index/SparseRangeIndexTest.java b/s3stream/src/test/java/com/automq/stream/s3/index/SparseRangeIndexTest.java index 26bcfb4ce4..e0852f0c94 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/index/SparseRangeIndexTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/index/SparseRangeIndexTest.java @@ -13,7 +13,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Set; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -21,51 +20,109 @@ public class SparseRangeIndexTest { @Test public void testAppend() { - int compactNum = 5; - int sparsePadding = 1; - SparseRangeIndex sparseRangeIndex = new SparseRangeIndex(compactNum, sparsePadding); + int totalSize = 6; + int compactNum = 2; + SparseRangeIndex sparseRangeIndex = new SparseRangeIndex(compactNum); int nextStartOffset = 0; List originList = new ArrayList<>(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < totalSize; i++) { RangeIndex rangeIndex = new RangeIndex(nextStartOffset, nextStartOffset + 10, i); sparseRangeIndex.append(rangeIndex); originList.add(rangeIndex); nextStartOffset += 10; } + // test append out of order range - sparseRangeIndex.append(new RangeIndex(0, 10, 0)); - Assertions.assertEquals(1, sparseRangeIndex.size()); + int delta = sparseRangeIndex.append(new RangeIndex(0, 10, 0)); + Assertions.assertEquals(-RangeIndex.OBJECT_SIZE * (totalSize - 1), delta); + Assertions.assertEquals(1, sparseRangeIndex.length()); + Assertions.assertEquals(RangeIndex.OBJECT_SIZE, sparseRangeIndex.size()); nextStartOffset = 10; - for (int i = 1; i < 10; i++) { + for (int i = 1; i < totalSize; i++) { RangeIndex rangeIndex = new RangeIndex(nextStartOffset, nextStartOffset + 10, i); sparseRangeIndex.append(rangeIndex); nextStartOffset += 10; } - Assertions.assertEquals(7, sparseRangeIndex.size()); - List rangeIndexList = sparseRangeIndex.getRangeIndexList(); - checkOrder(rangeIndexList); - for (int i = 0; i < originList.size(); i++) { - if (i >= originList.size() - compactNum || i % 2 != 0) { - Assertions.assertTrue(rangeIndexList.contains(originList.get(i))); - } else { - Assertions.assertFalse(rangeIndexList.contains(originList.get(i))); - } - } + Assertions.assertEquals(totalSize, sparseRangeIndex.length()); + Assertions.assertEquals(originList, sparseRangeIndex.getRangeIndexList()); - RangeIndex index0 = rangeIndexList.get(0); - RangeIndex index1 = rangeIndexList.get(1); - RangeIndex newRangeIndex = new RangeIndex(index0.getStartOffset(), index1.getEndOffset(), 10); - sparseRangeIndex.compact(newRangeIndex, Set.of(index0.getObjectId(), index1.getObjectId())); - rangeIndexList = sparseRangeIndex.getRangeIndexList(); - Assertions.assertEquals(6, rangeIndexList.size()); - checkOrder(rangeIndexList); - } + // init: 0, 1, 2, 3, 4, 5 + // test evict 1rst + // 0, 2, 3, 4, 5 + int expectedLength = totalSize - 1; + Assertions.assertEquals(RangeIndex.OBJECT_SIZE, sparseRangeIndex.evictOnce()); + Assertions.assertEquals(expectedLength, sparseRangeIndex.length()); + Assertions.assertEquals(RangeIndex.OBJECT_SIZE * expectedLength, sparseRangeIndex.size()); + List expectedList = new ArrayList<>(originList); + expectedList.remove(originList.get(1)); + Assertions.assertEquals(expectedList, sparseRangeIndex.getRangeIndexList()); - private void checkOrder(List rangeIndexList) { - for (int i = 0; i < rangeIndexList.size() - 1; i++) { - Assertions.assertTrue(rangeIndexList.get(i).compareTo(rangeIndexList.get(i + 1)) < 0); - } + // test evict 2nd + // 0, 2, 4, 5 + expectedLength = totalSize - 2; + Assertions.assertEquals(RangeIndex.OBJECT_SIZE, sparseRangeIndex.evictOnce()); + Assertions.assertEquals(expectedLength, sparseRangeIndex.length()); + Assertions.assertEquals(RangeIndex.OBJECT_SIZE * expectedLength, sparseRangeIndex.size()); + expectedList = new ArrayList<>(originList); + expectedList.remove(originList.get(1)); + expectedList.remove(originList.get(3)); + Assertions.assertEquals(expectedList, sparseRangeIndex.getRangeIndexList()); + + // test evict 3rd + // 0, 4, 5 + expectedLength = totalSize - 3; + Assertions.assertEquals(RangeIndex.OBJECT_SIZE, sparseRangeIndex.evictOnce()); + Assertions.assertEquals(expectedLength, sparseRangeIndex.length()); + Assertions.assertEquals(RangeIndex.OBJECT_SIZE * expectedLength, sparseRangeIndex.size()); + expectedList = new ArrayList<>(originList); + expectedList.remove(originList.get(1)); + expectedList.remove(originList.get(3)); + expectedList.remove(originList.get(2)); + Assertions.assertEquals(expectedList, sparseRangeIndex.getRangeIndexList()); + + // test evict 4th + // 0, 5 + expectedLength = totalSize - 4; + Assertions.assertEquals(RangeIndex.OBJECT_SIZE, sparseRangeIndex.evictOnce()); + Assertions.assertEquals(expectedLength, sparseRangeIndex.length()); + Assertions.assertEquals(RangeIndex.OBJECT_SIZE * expectedLength, sparseRangeIndex.size()); + expectedList = new ArrayList<>(originList); + expectedList.remove(originList.get(1)); + expectedList.remove(originList.get(3)); + expectedList.remove(originList.get(2)); + expectedList.remove(originList.get(4)); + Assertions.assertEquals(expectedList, sparseRangeIndex.getRangeIndexList()); + + // test evict 5th + // 0 + expectedLength = totalSize - 5; + Assertions.assertEquals(RangeIndex.OBJECT_SIZE, sparseRangeIndex.evictOnce()); + Assertions.assertEquals(expectedLength, sparseRangeIndex.length()); + Assertions.assertEquals(RangeIndex.OBJECT_SIZE * expectedLength, sparseRangeIndex.size()); + expectedList = new ArrayList<>(originList); + expectedList.remove(originList.get(1)); + expectedList.remove(originList.get(3)); + expectedList.remove(originList.get(2)); + expectedList.remove(originList.get(4)); + expectedList.remove(originList.get(5)); + Assertions.assertEquals(expectedList, sparseRangeIndex.getRangeIndexList()); + + // test evict 6th + expectedLength = 0; + Assertions.assertEquals(RangeIndex.OBJECT_SIZE, sparseRangeIndex.evictOnce()); + Assertions.assertEquals(expectedLength, sparseRangeIndex.length()); + Assertions.assertEquals(0, sparseRangeIndex.size()); + expectedList = new ArrayList<>(originList); + expectedList.remove(originList.get(1)); + expectedList.remove(originList.get(3)); + expectedList.remove(originList.get(2)); + expectedList.remove(originList.get(4)); + expectedList.remove(originList.get(5)); + expectedList.remove(originList.get(0)); + Assertions.assertEquals(expectedList, sparseRangeIndex.getRangeIndexList()); + + Assertions.assertEquals(0, sparseRangeIndex.evictOnce()); } }