diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java index cc6f94a0d..657615310 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java @@ -94,7 +94,14 @@ public class SlidingWindowService { */ private Block currentBlock; + /** + * The thread pool for write operations. + */ private ExecutorService ioExecutor; + /** + * The scheduler for polling blocks and sending them to @{@link #ioExecutor}. + */ + private ScheduledExecutorService pollBlockScheduler; /** * The last time when a batch of blocks is written to the disk. @@ -121,7 +128,7 @@ public void start(AtomicLong windowMaxLength, long windowStartOffset) { this.windowCoreData = new WindowCoreData(windowMaxLength, windowStartOffset, windowStartOffset); this.ioExecutor = Threads.newFixedThreadPoolWithMonitor(ioThreadNums, "block-wal-io-thread", false, LOGGER); - ScheduledExecutorService pollBlockScheduler = Threads.newSingleThreadScheduledExecutor( + this.pollBlockScheduler = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("wal-poll-block-thread-%d", false), LOGGER); pollBlockScheduler.scheduleAtFixedRate(this::tryWriteBlock, 0, minWriteIntervalNanos, TimeUnit.NANOSECONDS); initialized.set(true); @@ -138,6 +145,7 @@ public boolean shutdown(long timeout, TimeUnit unit) { boolean gracefulShutdown; this.ioExecutor.shutdown(); + this.pollBlockScheduler.shutdownNow(); try { gracefulShutdown = this.ioExecutor.awaitTermination(timeout, unit); } catch (InterruptedException e) { @@ -348,25 +356,15 @@ private void writeBlockData(BlockBatch blocks) throws IOException { S3StreamMetricsManager.recordStageLatency(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.APPEND_WAL_WRITE); } - private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException, OverCapacityException { + private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException { // align to block size newWindowEndOffset = WALUtil.alignLargeByBlockSize(newWindowEndOffset); long windowStartOffset = windowCoreData.getStartOffset(); long windowMaxLength = windowCoreData.getMaxLength(); if (newWindowEndOffset > windowStartOffset + windowMaxLength) { - long newWindowMaxLength = newWindowEndOffset - windowStartOffset + scaleUnit; - if (newWindowMaxLength > upperLimit) { - // exceed upper limit - if (newWindowEndOffset - windowStartOffset >= upperLimit) { - // however, the new window length is still larger than upper limit, so we just set it to upper limit - newWindowMaxLength = upperLimit; - } else { - // the new window length is bigger than upper limit, reject this write request - LOGGER.error("new windows size {} exceeds upper limit {}, reject this write request, window start offset: {}, new window end offset: {}", - newWindowMaxLength, upperLimit, windowStartOffset, newWindowEndOffset); - throw new OverCapacityException(String.format("new windows size exceeds upper limit %d", upperLimit)); - } - } + // endOffset - startOffset <= block.maxSize <= upperLimit in {@link #sealAndNewBlockLocked} + assert newWindowEndOffset - windowStartOffset <= upperLimit; + long newWindowMaxLength = Math.min(newWindowEndOffset - windowStartOffset + scaleUnit, upperLimit); windowCoreData.scaleOutWindow(walHeaderFlusher, newWindowMaxLength); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java index 28e26393b..f50042617 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java @@ -107,7 +107,7 @@ private static void resetWALHeader(String path) throws IOException { } } - private static void logIt(Config config, Stat stat) { + private static Runnable logIt(Config config, Stat stat) { ScheduledExecutorService statExecutor = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("stat-thread-%d", true), null); statExecutor.scheduleAtFixedRate(() -> { @@ -120,6 +120,7 @@ private static void logIt(Config config, Stat stat) { (double) result.maxCostNanos() / TimeUnit.MILLISECONDS.toNanos(1)); } }, LOG_INTERVAL_SECONDS, LOG_INTERVAL_SECONDS, TimeUnit.SECONDS); + return statExecutor::shutdownNow; } private void run(Config config) { @@ -129,7 +130,7 @@ private void run(Config config) { config.threads, ThreadUtils.createThreadFactory("append-thread-%d", false), null); AppendTaskConfig appendTaskConfig = new AppendTaskConfig(config); Stat stat = new Stat(); - runTrimTask(); + Runnable stopTrim = runTrimTask(); for (int i = 0; i < config.threads; i++) { int index = i; executor.submit(() -> { @@ -141,7 +142,7 @@ private void run(Config config) { } }); } - logIt(config, stat); + Runnable stopLog = logIt(config, stat); executor.shutdown(); try { @@ -151,11 +152,13 @@ private void run(Config config) { } catch (InterruptedException e) { executor.shutdownNow(); } + stopLog.run(); + stopTrim.run(); System.out.println("Benchmark finished"); } - private void runTrimTask() { + private Runnable runTrimTask() { ScheduledExecutorService trimExecutor = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("trim-thread-%d", true), null); trimExecutor.scheduleAtFixedRate(() -> { @@ -166,6 +169,7 @@ private void runTrimTask() { e.printStackTrace(); } }, TRIM_INTERVAL_MILLIS, TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + return trimExecutor::shutdownNow; } private void runAppendTask(int index, AppendTaskConfig config, Stat stat) throws Exception {