Skip to content

Commit

Permalink
fix(s3stream/wal): fix checks before scaleOutWindow (#873)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Jan 3, 2024
1 parent 1fd1611 commit 7efe717
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,14 @@ public class SlidingWindowService {
*/
private Block currentBlock;

/**
* The thread pool for write operations.
*/
private ExecutorService ioExecutor;
/**
* The scheduler for polling blocks and sending them to @{@link #ioExecutor}.
*/
private ScheduledExecutorService pollBlockScheduler;

/**
* The last time when a batch of blocks is written to the disk.
Expand All @@ -121,7 +128,7 @@ public void start(AtomicLong windowMaxLength, long windowStartOffset) {
this.windowCoreData = new WindowCoreData(windowMaxLength, windowStartOffset, windowStartOffset);
this.ioExecutor = Threads.newFixedThreadPoolWithMonitor(ioThreadNums,
"block-wal-io-thread", false, LOGGER);
ScheduledExecutorService pollBlockScheduler = Threads.newSingleThreadScheduledExecutor(
this.pollBlockScheduler = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("wal-poll-block-thread-%d", false), LOGGER);
pollBlockScheduler.scheduleAtFixedRate(this::tryWriteBlock, 0, minWriteIntervalNanos, TimeUnit.NANOSECONDS);
initialized.set(true);
Expand All @@ -138,6 +145,7 @@ public boolean shutdown(long timeout, TimeUnit unit) {

boolean gracefulShutdown;
this.ioExecutor.shutdown();
this.pollBlockScheduler.shutdownNow();
try {
gracefulShutdown = this.ioExecutor.awaitTermination(timeout, unit);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -348,25 +356,15 @@ private void writeBlockData(BlockBatch blocks) throws IOException {
S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_WRITE);
}

private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException, OverCapacityException {
private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException {
// align to block size
newWindowEndOffset = WALUtil.alignLargeByBlockSize(newWindowEndOffset);
long windowStartOffset = windowCoreData.getStartOffset();
long windowMaxLength = windowCoreData.getMaxLength();
if (newWindowEndOffset > windowStartOffset + windowMaxLength) {
long newWindowMaxLength = newWindowEndOffset - windowStartOffset + scaleUnit;
if (newWindowMaxLength > upperLimit) {
// exceed upper limit
if (newWindowEndOffset - windowStartOffset >= upperLimit) {
// however, the new window length is still larger than upper limit, so we just set it to upper limit
newWindowMaxLength = upperLimit;
} else {
// the new window length is bigger than upper limit, reject this write request
LOGGER.error("new windows size {} exceeds upper limit {}, reject this write request, window start offset: {}, new window end offset: {}",
newWindowMaxLength, upperLimit, windowStartOffset, newWindowEndOffset);
throw new OverCapacityException(String.format("new windows size exceeds upper limit %d", upperLimit));
}
}
// endOffset - startOffset <= block.maxSize <= upperLimit in {@link #sealAndNewBlockLocked}
assert newWindowEndOffset - windowStartOffset <= upperLimit;
long newWindowMaxLength = Math.min(newWindowEndOffset - windowStartOffset + scaleUnit, upperLimit);
windowCoreData.scaleOutWindow(walHeaderFlusher, newWindowMaxLength);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private static void resetWALHeader(String path) throws IOException {
}
}

private static void logIt(Config config, Stat stat) {
private static Runnable logIt(Config config, Stat stat) {
ScheduledExecutorService statExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stat-thread-%d", true), null);
statExecutor.scheduleAtFixedRate(() -> {
Expand All @@ -120,6 +120,7 @@ private static void logIt(Config config, Stat stat) {
(double) result.maxCostNanos() / TimeUnit.MILLISECONDS.toNanos(1));
}
}, LOG_INTERVAL_SECONDS, LOG_INTERVAL_SECONDS, TimeUnit.SECONDS);
return statExecutor::shutdownNow;
}

private void run(Config config) {
Expand All @@ -129,7 +130,7 @@ private void run(Config config) {
config.threads, ThreadUtils.createThreadFactory("append-thread-%d", false), null);
AppendTaskConfig appendTaskConfig = new AppendTaskConfig(config);
Stat stat = new Stat();
runTrimTask();
Runnable stopTrim = runTrimTask();
for (int i = 0; i < config.threads; i++) {
int index = i;
executor.submit(() -> {
Expand All @@ -141,7 +142,7 @@ private void run(Config config) {
}
});
}
logIt(config, stat);
Runnable stopLog = logIt(config, stat);

executor.shutdown();
try {
Expand All @@ -151,11 +152,13 @@ private void run(Config config) {
} catch (InterruptedException e) {
executor.shutdownNow();
}
stopLog.run();
stopTrim.run();

System.out.println("Benchmark finished");
}

private void runTrimTask() {
private Runnable runTrimTask() {
ScheduledExecutorService trimExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("trim-thread-%d", true), null);
trimExecutor.scheduleAtFixedRate(() -> {
Expand All @@ -166,6 +169,7 @@ private void runTrimTask() {
e.printStackTrace();
}
}, TRIM_INTERVAL_MILLIS, TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
return trimExecutor::shutdownNow;
}

private void runAppendTask(int index, AppendTaskConfig config, Stat stat) throws Exception {
Expand Down

0 comments on commit 7efe717

Please sign in to comment.