Skip to content

Commit

Permalink
perf(s3storage/wal): use PriorityQueue rather than TreeSet (#753)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Nov 29, 2023
1 parent 48897c5 commit ba18c3c
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public enum S3Operation {
/* S3 storage operations start */
APPEND_STORAGE(S3MetricsType.S3Storage, "append"),
APPEND_STORAGE_WAL(S3MetricsType.S3Storage, "append_wal"),
APPEND_STORAGE_WAL_BEFORE(S3MetricsType.S3Storage, "append_wal_before"),
APPEND_STORAGE_WAL_AWAIT(S3MetricsType.S3Storage, "append_wal_await"),
APPEND_STORAGE_WAL_WRITE(S3MetricsType.S3Storage, "append_wal_write"),
APPEND_STORAGE_WAL_AFTER(S3MetricsType.S3Storage, "append_wal_after"),
APPEND_STORAGE_APPEND_CALLBACK(S3MetricsType.S3Storage, "append_callback"),
APPEND_STORAGE_WAL_FULL(S3MetricsType.S3Storage, "append_wal_full"),
APPEND_STORAGE_LOG_CACHE(S3MetricsType.S3Storage, "append_log_cache"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.stream.s3.wal;

import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.metrics.stats.ByteBufMetricsStats;
import com.automq.stream.s3.wal.util.WALUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
Expand Down Expand Up @@ -104,9 +105,12 @@ public ByteBuf data() {
}

data = DirectByteBufAlloc.compositeByteBuffer();
for (Supplier<ByteBuf> record : records) {
data.addComponent(true, record.get());
for (Supplier<ByteBuf> supplier : records) {
ByteBuf record = supplier.get();
ByteBufMetricsStats.getHistogram("wal_record").update(record.readableBytes());
data.addComponent(true, record);
}
ByteBufMetricsStats.getHistogram("wal_block").update(data.readableBytes());
return data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ private AppendResult append0(ByteBuf body, int crc) throws OverCapacityException

final AppendResult appendResult = new AppendResultImpl(expectedWriteOffset, appendResultFuture);
appendResult.future().whenComplete((nil, ex) -> OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL_BEFORE).update(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return appendResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package com.automq.stream.s3.wal;

import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.wal.util.WALChannel;
import com.automq.stream.s3.wal.util.WALUtil;
import com.automq.stream.utils.FutureUtil;
Expand All @@ -30,8 +33,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -70,7 +73,7 @@ public class SlidingWindowService {
/**
* Blocks that are being written.
*/
private final TreeSet<Long> writingBlocks = new TreeSet<>();
private final Queue<Long> writingBlocks = new PriorityQueue<>();
/**
* Whether the service is initialized.
* After the service is initialized, data in {@link #windowCoreData} is valid.
Expand Down Expand Up @@ -329,15 +332,17 @@ private long wroteBlocksLocked(BlockBatch wroteBlocks) {
if (writingBlocks.isEmpty()) {
return getCurrentBlockLocked().startOffset();
}
return writingBlocks.first();
return writingBlocks.peek();
}

private void writeBlockData(BlockBatch blocks) throws IOException {
TimerUtil timer = new TimerUtil();
for (Block block : blocks.blocks()) {
long position = WALUtil.recordOffsetToPosition(block.startOffset(), walChannel.capacity(), WAL_HEADER_TOTAL_CAPACITY);
walChannel.write(block.data(), position);
}
walChannel.flush();
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL_WRITE).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
}

private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException, OverCapacityException {
Expand Down Expand Up @@ -517,13 +522,16 @@ public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) throws I

class WriteBlockProcessor implements Runnable {
private final BlockBatch blocks;
private final TimerUtil timer;

public WriteBlockProcessor(BlockBatch blocks) {
this.blocks = blocks;
this.timer = new TimerUtil();
}

@Override
public void run() {
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL_AWAIT).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
writeBlock(this.blocks);
}

Expand All @@ -532,6 +540,7 @@ private void writeBlock(BlockBatch blocks) {
makeWriteOffsetMatchWindow(blocks.endOffset());
writeBlockData(blocks);

TimerUtil timer = new TimerUtil();
// Update the start offset of the sliding window after finishing writing the record.
windowCoreData.updateWindowStartOffset(wroteBlocks(blocks));

Expand All @@ -546,6 +555,7 @@ public String toString() {
return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}';
}
});
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_WAL_AFTER).update(timer.elapsedAs(TimeUnit.NANOSECONDS));
} catch (Exception e) {
FutureUtil.completeExceptionally(blocks.futures(), e);
LOGGER.error(String.format("failed to write blocks, startOffset: %s", blocks.startOffset()), e);
Expand Down

0 comments on commit ba18c3c

Please sign in to comment.