Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metadata): limit the size of sparse index cache #1833

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@ public CompletableFuture<ByteBuf> readNodeRangeIndex(long nodeId) {
Map<Long, SparseRangeIndex> streamRangeIndexMap;
if (nodeId == BROKER0) {
streamRangeIndexMap = Map.of(
STREAM0, new SparseRangeIndex(2, 1, List.of(
STREAM0, new SparseRangeIndex(2, List.of(
new RangeIndex(100, 120, 0),
new RangeIndex(180, 200, 2),
new RangeIndex(520, 600, 4))));
} else {
streamRangeIndexMap = Map.of(
STREAM0, new SparseRangeIndex(2, 1, List.of(
STREAM0, new SparseRangeIndex(2, List.of(
new RangeIndex(140, 160, 5),
new RangeIndex(420, 520, 7),
// objectId 8 is not exist (compacted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ private void compactObjects(List<StreamMetadata> streamMetadataList, List<S3Obje
logger.info("No stream set objects to compact");
return;
}
logger.info("Build compact request for {} stream set objects complete, stream set object id: {}, stresam set object size: {}, stream object num: {}, time cost: {}, start committing objects",
logger.info("Build compact request for {} stream set objects complete, stream set object id: {}, stream set object size: {}, stream object num: {}, time cost: {}, start committing objects",
request.getCompactedObjectIds().size(), request.getObjectId(), request.getObjectSize(), request.getStreamObjects().size(), timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
timerUtil.reset();
objectManager.commitStreamSetObject(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,31 @@
public class LocalStreamRangeIndexCache implements S3StreamClient.StreamLifeCycleListener {
private static final short VERSION = 0;
private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamRangeIndexCache.class);
private static final int COMPACT_NUM = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_COMPACT_NUM", 5);
private static final int SPARSE_PADDING = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_SPARSE_PADDING", 1);
private static final int COMPACT_NUM = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_COMPACT_NUM", 3);
public static final int MAX_INDEX_SIZE = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_MAX_SIZE", 5 * 1024 * 1024);
private final Map<Long, SparseRangeIndex> streamRangeIndexMap = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("upload-index", true));
private final Queue<CompletableFuture<Void>> uploadQueue = new LinkedList<>();
private final CompletableFuture<Void> initCf = new CompletableFuture<>();
private long nodeId = -1;
private ObjectStorage objectStorage;
private CompletableFuture<Void> initCf = new CompletableFuture<>();
private int totalSize = 0;

public void start() {
executorService.scheduleAtFixedRate(this::batchUpload, 0, 10, TimeUnit.MILLISECONDS);
executorService.scheduleAtFixedRate(this::flush, 1, 1, TimeUnit.MINUTES);
}

// for test
void reset() {
writeLock.lock();
try {
streamRangeIndexMap.clear();
initCf = new CompletableFuture<>();
} finally {
writeLock.unlock();
}
public int totalSize() {
return totalSize;
}

CompletableFuture<Void> initCf() {
return initCf;
}

// test only
Expand Down Expand Up @@ -158,7 +156,8 @@ public void init(int nodeId, ObjectStorage objectStorage) {
writeLock.lock();
try {
for (Map.Entry<Long, List<RangeIndex>> entry : LocalStreamRangeIndexCache.fromBuffer(data).entrySet()) {
this.streamRangeIndexMap.put(entry.getKey(), new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING, entry.getValue()));
this.streamRangeIndexMap.put(entry.getKey(), new SparseRangeIndex(COMPACT_NUM, entry.getValue()));
this.totalSize += entry.getValue().size() * RangeIndex.OBJECT_SIZE;
}
} finally {
writeLock.unlock();
Expand Down Expand Up @@ -207,16 +206,44 @@ public CompletableFuture<Void> append(Map<Long, RangeIndex> rangeIndexMap) {
for (Map.Entry<Long, RangeIndex> entry : rangeIndexMap.entrySet()) {
long streamId = entry.getKey();
RangeIndex rangeIndex = entry.getValue();
streamRangeIndexMap.computeIfAbsent(streamId,
k -> new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING)).append(rangeIndex);
totalSize += streamRangeIndexMap.computeIfAbsent(streamId,
k -> new SparseRangeIndex(COMPACT_NUM)).append(rangeIndex);
}
evictIfNecessary();
} finally {
writeLock.unlock();
}
return null;
});
}

private void evictIfNecessary() {
if (totalSize <= MAX_INDEX_SIZE) {
return;
}
boolean evicted = false;
boolean hasSufficientIndex = true;
List<SparseRangeIndex> streamRangeIndexList = new ArrayList<>(streamRangeIndexMap.values());
Collections.shuffle(streamRangeIndexList);
while (totalSize > MAX_INDEX_SIZE) {
// try to evict from each stream in round-robin manner
for (SparseRangeIndex sparseRangeIndex : streamRangeIndexList) {
if (sparseRangeIndex.length() <= 1 + COMPACT_NUM && hasSufficientIndex) {
// skip evict if there is still sufficient stream to be evicted
continue;
}
totalSize -= sparseRangeIndex.evictOnce();
evicted = true;
if (totalSize <= MAX_INDEX_SIZE) {
break;
}
}
if (!evicted) {
hasSufficientIndex = false;
}
}
}

public CompletableFuture<Void> compact(Map<Long, RangeIndex> rangeIndexMap, Set<Long> compactedObjectIds) {
return exec(() -> {
writeLock.lock();
Expand All @@ -225,8 +252,8 @@ public CompletableFuture<Void> compact(Map<Long, RangeIndex> rangeIndexMap, Set<
Iterator<Map.Entry<Long, SparseRangeIndex>> iterator = streamRangeIndexMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, SparseRangeIndex> entry = iterator.next();
entry.getValue().compact(null, compactedObjectIds);
if (entry.getValue().size() == 0) {
totalSize += entry.getValue().compact(null, compactedObjectIds);
if (entry.getValue().length() == 0) {
iterator.remove();
}
}
Expand All @@ -237,10 +264,10 @@ public CompletableFuture<Void> compact(Map<Long, RangeIndex> rangeIndexMap, Set<
RangeIndex rangeIndex = entry.getValue();
streamRangeIndexMap.compute(streamId, (k, v) -> {
if (v == null) {
v = new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING);
v = new SparseRangeIndex(COMPACT_NUM);
}
v.compact(rangeIndex, compactedObjectIds);
if (v.size() == 0) {
totalSize += v.compact(rangeIndex, compactedObjectIds);
if (v.length() == 0) {
// remove stream with empty index
return null;
}
Expand Down Expand Up @@ -270,11 +297,7 @@ public CompletableFuture<Void> updateIndexFromRequest(CommitStreamSetObjectReque
}

public static ByteBuf toBuffer(Map<Long, SparseRangeIndex> streamRangeIndexMap) {
int capacity = Short.BYTES // version
+ Integer.BYTES // stream num
+ streamRangeIndexMap.values().stream().mapToInt(index -> Long.BYTES // stream id
+ Integer.BYTES // range index num
+ index.getRangeIndexList().size() * (3 * Long.BYTES)).sum();
int capacity = bufferSize(streamRangeIndexMap);
ByteBuf buffer = ByteBufAlloc.byteBuffer(capacity);
try {
buffer.writeShort(VERSION);
Expand All @@ -295,6 +318,14 @@ public static ByteBuf toBuffer(Map<Long, SparseRangeIndex> streamRangeIndexMap)
return buffer;
}

private static int bufferSize(Map<Long, SparseRangeIndex> streamRangeIndexMap) {
return Short.BYTES // version
+ Integer.BYTES // stream num
+ streamRangeIndexMap.values().stream().mapToInt(index -> Long.BYTES // stream id
+ Integer.BYTES // range index num
+ index.getRangeIndexList().size() * (3 * Long.BYTES)).sum();
}

public static Map<Long, List<RangeIndex>> fromBuffer(ByteBuf data) {
Map<Long, List<RangeIndex>> rangeIndexMap = new HashMap<>();
short version = data.readShort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,64 @@
public class SparseRangeIndex {
private static final Logger LOGGER = LoggerFactory.getLogger(SparseRangeIndex.class);
private final int compactNum;
private final int sparsePadding;
// sorted by startOffset in descending order
private List<RangeIndex> sortedRangeIndexList;
private long evictIndex = 0;
private int size = 0;
private int evictIndex = 0;

public SparseRangeIndex(int compactNum, int sparsePadding) {
this(compactNum, sparsePadding, new ArrayList<>());
public SparseRangeIndex(int compactNum) {
this(compactNum, new ArrayList<>());
}

public SparseRangeIndex(int compactNum, int sparsePadding, List<RangeIndex> sortedRangeIndexList) {
public SparseRangeIndex(int compactNum, List<RangeIndex> sortedRangeIndexList) {
this.compactNum = compactNum;
this.sparsePadding = sparsePadding;
init(sortedRangeIndexList);
}

private void init(List<RangeIndex> sortedRangeIndexList) {
if (sortedRangeIndexList == null) {
sortedRangeIndexList = new ArrayList<>();
}
this.sortedRangeIndexList = sortedRangeIndexList;
this.size = sortedRangeIndexList.size() * RangeIndex.OBJECT_SIZE;
}

public void append(RangeIndex newRangeIndex) {
/**
* Append new range index to the list.
* @param newRangeIndex the range index to append
* @return the change of size after appending
*/
public int append(RangeIndex newRangeIndex) {
int delta = 0;
if (newRangeIndex == null) {
return;
return delta;
}
if (!this.sortedRangeIndexList.isEmpty()
&& newRangeIndex.compareTo(this.sortedRangeIndexList.get(this.sortedRangeIndexList.size() - 1)) <= 0) {
LOGGER.error("Unexpected new range index {}, last: {}, maybe initialized with outdated index file, " +
"reset local cache", newRangeIndex, this.sortedRangeIndexList.get(this.sortedRangeIndexList.size() - 1));
delta -= size;
reset();
}
this.sortedRangeIndexList.add(newRangeIndex);
evict();
size += RangeIndex.OBJECT_SIZE;
return delta + RangeIndex.OBJECT_SIZE;
}

public void reset() {
this.sortedRangeIndexList.clear();
evictIndex = 0;
init(new ArrayList<>());
}

public void compact(RangeIndex newRangeIndex, Set<Long> compactedObjectIds) {
/**
* Compact the list by removing the compacted object ids and add the new range index if not null.
*
* @param newRangeIndex the new range index to add
* @param compactedObjectIds the object ids to compact
* @return the change of size after compacting
*/
public int compact(RangeIndex newRangeIndex, Set<Long> compactedObjectIds) {
if (compactedObjectIds.isEmpty()) {
append(newRangeIndex);
return;
return append(newRangeIndex);
}
List<RangeIndex> newRangeIndexList = new ArrayList<>();
boolean found = false;
Expand All @@ -68,29 +88,70 @@ public void compact(RangeIndex newRangeIndex, Set<Long> compactedObjectIds) {
continue;
}
if (newRangeIndex != null && !found && rangeIndex.compareTo(newRangeIndex) > 0) {
// insert new range index into the list
newRangeIndexList.add(newRangeIndex);
found = true;
}
newRangeIndexList.add(rangeIndex);
}
if (newRangeIndex != null && !found) {
// insert new range index into the end of the list
newRangeIndexList.add(newRangeIndex);
}
this.sortedRangeIndexList = newRangeIndexList;
int oldSize = size;
init(newRangeIndexList);
return size - oldSize;
}

private void evict() {
if (this.sortedRangeIndexList.size() > this.compactNum) {
if (evictIndex++ % (sparsePadding + 1) == 0) {
this.sortedRangeIndexList.remove(this.sortedRangeIndexList.size() - this.compactNum - 1);
/**
* Try to evict one range index from the list, the eviction priority for each element is:
* 1. any element that's not the first and last N compacted elements
* 2. the last N compacted elements
* 3. the first element
* <p>
* For example for a list of [0, 1, 2, 3, 4, 5], compact number is 2, the eviction result will be:
* <ul>
* <li><code>1rst: [0, 2, 3, 4, 5]</code></li>
* <li><code>2nd: [0, 2, 4, 5]</code></li>
* <li><code>3rd: [0, 4, 5]</code></li>
* <li><code>4th: [0, 5]</code></li>
* <li><code>5th: [0]</code></li>
* <li><code>6th: []</code></li>
* </ul>
*
* @return evicted size
*/
public int evictOnce() {
int indexToEvict = -1;
if (this.sortedRangeIndexList.isEmpty()) {
return 0;
} else if (this.sortedRangeIndexList.size() == 1) {
// evict the only element
indexToEvict = 0;
} else if (this.sortedRangeIndexList.size() <= (1 + compactNum)) {
indexToEvict = 1;
}

if (indexToEvict == -1) {
if (evictIndex % this.sortedRangeIndexList.size() == 0
|| this.evictIndex >= this.sortedRangeIndexList.size() - compactNum) {
this.evictIndex = 1;
}
indexToEvict = evictIndex++;
}
this.sortedRangeIndexList.remove(indexToEvict);
size -= RangeIndex.OBJECT_SIZE;
return RangeIndex.OBJECT_SIZE;
}

public int size() {
public int length() {
return this.sortedRangeIndexList.size();
}

public int size() {
return size;
}

List<RangeIndex> getRangeIndexList() {
return this.sortedRangeIndexList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected void newLargeObjectWriter(WriteOptions writeOptions, AbstractObjectSto
}

class ObjectWriter implements Writer {
// max upload size, when object data size is larger MAX_UPLOAD_SIZE, we should use multi-part upload to upload it.
// max upload size, when object data size is larger than MAX_UPLOAD_SIZE, we should use multi-part upload to upload it.
static final long MAX_UPLOAD_SIZE = 32L * 1024 * 1024;
CompletableFuture<Void> cf = new CompletableFuture<>();
CompositeByteBuf data = ByteBufAlloc.compositeByteBuffer();
Expand Down
Loading
Loading