Skip to content

Commit

Permalink
feat(metadata): limit the size of sparse index cache (#1833)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Aug 19, 2024
1 parent 34ba11e commit 2214e86
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@ public CompletableFuture<ByteBuf> readNodeRangeIndex(long nodeId) {
Map<Long, SparseRangeIndex> streamRangeIndexMap;
if (nodeId == BROKER0) {
streamRangeIndexMap = Map.of(
STREAM0, new SparseRangeIndex(2, 1, List.of(
STREAM0, new SparseRangeIndex(2, List.of(
new RangeIndex(100, 120, 0),
new RangeIndex(180, 200, 2),
new RangeIndex(520, 600, 4))));
} else {
streamRangeIndexMap = Map.of(
STREAM0, new SparseRangeIndex(2, 1, List.of(
STREAM0, new SparseRangeIndex(2, List.of(
new RangeIndex(140, 160, 5),
new RangeIndex(420, 520, 7),
// objectId 8 is not exist (compacted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ private void compactObjects(List<StreamMetadata> streamMetadataList, List<S3Obje
logger.info("No stream set objects to compact");
return;
}
logger.info("Build compact request for {} stream set objects complete, stream set object id: {}, stresam set object size: {}, stream object num: {}, time cost: {}, start committing objects",
logger.info("Build compact request for {} stream set objects complete, stream set object id: {}, stream set object size: {}, stream object num: {}, time cost: {}, start committing objects",
request.getCompactedObjectIds().size(), request.getObjectId(), request.getObjectSize(), request.getStreamObjects().size(), timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
timerUtil.reset();
objectManager.commitStreamSetObject(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,31 @@
public class LocalStreamRangeIndexCache implements S3StreamClient.StreamLifeCycleListener {
private static final short VERSION = 0;
private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamRangeIndexCache.class);
private static final int COMPACT_NUM = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_COMPACT_NUM", 5);
private static final int SPARSE_PADDING = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_SPARSE_PADDING", 1);
private static final int COMPACT_NUM = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_COMPACT_NUM", 3);
public static final int MAX_INDEX_SIZE = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_MAX_SIZE", 5 * 1024 * 1024);
private final Map<Long, SparseRangeIndex> streamRangeIndexMap = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("upload-index", true));
private final Queue<CompletableFuture<Void>> uploadQueue = new LinkedList<>();
private final CompletableFuture<Void> initCf = new CompletableFuture<>();
private long nodeId = -1;
private ObjectStorage objectStorage;
private CompletableFuture<Void> initCf = new CompletableFuture<>();
private int totalSize = 0;

public void start() {
executorService.scheduleAtFixedRate(this::batchUpload, 0, 10, TimeUnit.MILLISECONDS);
executorService.scheduleAtFixedRate(this::flush, 1, 1, TimeUnit.MINUTES);
}

// for test
void reset() {
writeLock.lock();
try {
streamRangeIndexMap.clear();
initCf = new CompletableFuture<>();
} finally {
writeLock.unlock();
}
public int totalSize() {
return totalSize;
}

CompletableFuture<Void> initCf() {
return initCf;
}

// test only
Expand Down Expand Up @@ -158,7 +156,8 @@ public void init(int nodeId, ObjectStorage objectStorage) {
writeLock.lock();
try {
for (Map.Entry<Long, List<RangeIndex>> entry : LocalStreamRangeIndexCache.fromBuffer(data).entrySet()) {
this.streamRangeIndexMap.put(entry.getKey(), new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING, entry.getValue()));
this.streamRangeIndexMap.put(entry.getKey(), new SparseRangeIndex(COMPACT_NUM, entry.getValue()));
this.totalSize += entry.getValue().size() * RangeIndex.OBJECT_SIZE;
}
} finally {
writeLock.unlock();
Expand Down Expand Up @@ -207,16 +206,44 @@ public CompletableFuture<Void> append(Map<Long, RangeIndex> rangeIndexMap) {
for (Map.Entry<Long, RangeIndex> entry : rangeIndexMap.entrySet()) {
long streamId = entry.getKey();
RangeIndex rangeIndex = entry.getValue();
streamRangeIndexMap.computeIfAbsent(streamId,
k -> new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING)).append(rangeIndex);
totalSize += streamRangeIndexMap.computeIfAbsent(streamId,
k -> new SparseRangeIndex(COMPACT_NUM)).append(rangeIndex);
}
evictIfNecessary();
} finally {
writeLock.unlock();
}
return null;
});
}

private void evictIfNecessary() {
if (totalSize <= MAX_INDEX_SIZE) {
return;
}
boolean evicted = false;
boolean hasSufficientIndex = true;
List<SparseRangeIndex> streamRangeIndexList = new ArrayList<>(streamRangeIndexMap.values());
Collections.shuffle(streamRangeIndexList);
while (totalSize > MAX_INDEX_SIZE) {
// try to evict from each stream in round-robin manner
for (SparseRangeIndex sparseRangeIndex : streamRangeIndexList) {
if (sparseRangeIndex.length() <= 1 + COMPACT_NUM && hasSufficientIndex) {
// skip evict if there is still sufficient stream to be evicted
continue;
}
totalSize -= sparseRangeIndex.evictOnce();
evicted = true;
if (totalSize <= MAX_INDEX_SIZE) {
break;
}
}
if (!evicted) {
hasSufficientIndex = false;
}
}
}

public CompletableFuture<Void> compact(Map<Long, RangeIndex> rangeIndexMap, Set<Long> compactedObjectIds) {
return exec(() -> {
writeLock.lock();
Expand All @@ -225,8 +252,8 @@ public CompletableFuture<Void> compact(Map<Long, RangeIndex> rangeIndexMap, Set<
Iterator<Map.Entry<Long, SparseRangeIndex>> iterator = streamRangeIndexMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, SparseRangeIndex> entry = iterator.next();
entry.getValue().compact(null, compactedObjectIds);
if (entry.getValue().size() == 0) {
totalSize += entry.getValue().compact(null, compactedObjectIds);
if (entry.getValue().length() == 0) {
iterator.remove();
}
}
Expand All @@ -237,10 +264,10 @@ public CompletableFuture<Void> compact(Map<Long, RangeIndex> rangeIndexMap, Set<
RangeIndex rangeIndex = entry.getValue();
streamRangeIndexMap.compute(streamId, (k, v) -> {
if (v == null) {
v = new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING);
v = new SparseRangeIndex(COMPACT_NUM);
}
v.compact(rangeIndex, compactedObjectIds);
if (v.size() == 0) {
totalSize += v.compact(rangeIndex, compactedObjectIds);
if (v.length() == 0) {
// remove stream with empty index
return null;
}
Expand Down Expand Up @@ -270,11 +297,7 @@ public CompletableFuture<Void> updateIndexFromRequest(CommitStreamSetObjectReque
}

public static ByteBuf toBuffer(Map<Long, SparseRangeIndex> streamRangeIndexMap) {
int capacity = Short.BYTES // version
+ Integer.BYTES // stream num
+ streamRangeIndexMap.values().stream().mapToInt(index -> Long.BYTES // stream id
+ Integer.BYTES // range index num
+ index.getRangeIndexList().size() * (3 * Long.BYTES)).sum();
int capacity = bufferSize(streamRangeIndexMap);
ByteBuf buffer = ByteBufAlloc.byteBuffer(capacity);
try {
buffer.writeShort(VERSION);
Expand All @@ -295,6 +318,14 @@ public static ByteBuf toBuffer(Map<Long, SparseRangeIndex> streamRangeIndexMap)
return buffer;
}

private static int bufferSize(Map<Long, SparseRangeIndex> streamRangeIndexMap) {
return Short.BYTES // version
+ Integer.BYTES // stream num
+ streamRangeIndexMap.values().stream().mapToInt(index -> Long.BYTES // stream id
+ Integer.BYTES // range index num
+ index.getRangeIndexList().size() * (3 * Long.BYTES)).sum();
}

public static Map<Long, List<RangeIndex>> fromBuffer(ByteBuf data) {
Map<Long, List<RangeIndex>> rangeIndexMap = new HashMap<>();
short version = data.readShort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,64 @@
public class SparseRangeIndex {
private static final Logger LOGGER = LoggerFactory.getLogger(SparseRangeIndex.class);
private final int compactNum;
private final int sparsePadding;
// sorted by startOffset in descending order
private List<RangeIndex> sortedRangeIndexList;
private long evictIndex = 0;
private int size = 0;
private int evictIndex = 0;

public SparseRangeIndex(int compactNum, int sparsePadding) {
this(compactNum, sparsePadding, new ArrayList<>());
public SparseRangeIndex(int compactNum) {
this(compactNum, new ArrayList<>());
}

public SparseRangeIndex(int compactNum, int sparsePadding, List<RangeIndex> sortedRangeIndexList) {
public SparseRangeIndex(int compactNum, List<RangeIndex> sortedRangeIndexList) {
this.compactNum = compactNum;
this.sparsePadding = sparsePadding;
init(sortedRangeIndexList);
}

private void init(List<RangeIndex> sortedRangeIndexList) {
if (sortedRangeIndexList == null) {
sortedRangeIndexList = new ArrayList<>();
}
this.sortedRangeIndexList = sortedRangeIndexList;
this.size = sortedRangeIndexList.size() * RangeIndex.OBJECT_SIZE;
}

public void append(RangeIndex newRangeIndex) {
/**
* Append new range index to the list.
* @param newRangeIndex the range index to append
* @return the change of size after appending
*/
public int append(RangeIndex newRangeIndex) {
int delta = 0;
if (newRangeIndex == null) {
return;
return delta;
}
if (!this.sortedRangeIndexList.isEmpty()
&& newRangeIndex.compareTo(this.sortedRangeIndexList.get(this.sortedRangeIndexList.size() - 1)) <= 0) {
LOGGER.error("Unexpected new range index {}, last: {}, maybe initialized with outdated index file, " +
"reset local cache", newRangeIndex, this.sortedRangeIndexList.get(this.sortedRangeIndexList.size() - 1));
delta -= size;
reset();
}
this.sortedRangeIndexList.add(newRangeIndex);
evict();
size += RangeIndex.OBJECT_SIZE;
return delta + RangeIndex.OBJECT_SIZE;
}

public void reset() {
this.sortedRangeIndexList.clear();
evictIndex = 0;
init(new ArrayList<>());
}

public void compact(RangeIndex newRangeIndex, Set<Long> compactedObjectIds) {
/**
* Compact the list by removing the compacted object ids and add the new range index if not null.
*
* @param newRangeIndex the new range index to add
* @param compactedObjectIds the object ids to compact
* @return the change of size after compacting
*/
public int compact(RangeIndex newRangeIndex, Set<Long> compactedObjectIds) {
if (compactedObjectIds.isEmpty()) {
append(newRangeIndex);
return;
return append(newRangeIndex);
}
List<RangeIndex> newRangeIndexList = new ArrayList<>();
boolean found = false;
Expand All @@ -68,29 +88,70 @@ public void compact(RangeIndex newRangeIndex, Set<Long> compactedObjectIds) {
continue;
}
if (newRangeIndex != null && !found && rangeIndex.compareTo(newRangeIndex) > 0) {
// insert new range index into the list
newRangeIndexList.add(newRangeIndex);
found = true;
}
newRangeIndexList.add(rangeIndex);
}
if (newRangeIndex != null && !found) {
// insert new range index into the end of the list
newRangeIndexList.add(newRangeIndex);
}
this.sortedRangeIndexList = newRangeIndexList;
int oldSize = size;
init(newRangeIndexList);
return size - oldSize;
}

private void evict() {
if (this.sortedRangeIndexList.size() > this.compactNum) {
if (evictIndex++ % (sparsePadding + 1) == 0) {
this.sortedRangeIndexList.remove(this.sortedRangeIndexList.size() - this.compactNum - 1);
/**
* Try to evict one range index from the list, the eviction priority for each element is:
* 1. any element that's not the first and last N compacted elements
* 2. the last N compacted elements
* 3. the first element
* <p>
* For example for a list of [0, 1, 2, 3, 4, 5], compact number is 2, the eviction result will be:
* <ul>
* <li><code>1rst: [0, 2, 3, 4, 5]</code></li>
* <li><code>2nd: [0, 2, 4, 5]</code></li>
* <li><code>3rd: [0, 4, 5]</code></li>
* <li><code>4th: [0, 5]</code></li>
* <li><code>5th: [0]</code></li>
* <li><code>6th: []</code></li>
* </ul>
*
* @return evicted size
*/
public int evictOnce() {
int indexToEvict = -1;
if (this.sortedRangeIndexList.isEmpty()) {
return 0;
} else if (this.sortedRangeIndexList.size() == 1) {
// evict the only element
indexToEvict = 0;
} else if (this.sortedRangeIndexList.size() <= (1 + compactNum)) {
indexToEvict = 1;
}

if (indexToEvict == -1) {
if (evictIndex % this.sortedRangeIndexList.size() == 0
|| this.evictIndex >= this.sortedRangeIndexList.size() - compactNum) {
this.evictIndex = 1;
}
indexToEvict = evictIndex++;
}
this.sortedRangeIndexList.remove(indexToEvict);
size -= RangeIndex.OBJECT_SIZE;
return RangeIndex.OBJECT_SIZE;
}

public int size() {
public int length() {
return this.sortedRangeIndexList.size();
}

public int size() {
return size;
}

List<RangeIndex> getRangeIndexList() {
return this.sortedRangeIndexList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected void newLargeObjectWriter(WriteOptions writeOptions, AbstractObjectSto
}

class ObjectWriter implements Writer {
// max upload size, when object data size is larger MAX_UPLOAD_SIZE, we should use multi-part upload to upload it.
// max upload size, when object data size is larger than MAX_UPLOAD_SIZE, we should use multi-part upload to upload it.
static final long MAX_UPLOAD_SIZE = 32L * 1024 * 1024;
CompletableFuture<Void> cf = new CompletableFuture<>();
CompositeByteBuf data = ByteBufAlloc.compositeByteBuffer();
Expand Down
Loading

0 comments on commit 2214e86

Please sign in to comment.