Skip to content

Commit

Permalink
fix(wal): shutdown the pollBlockScheduler during shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Jan 3, 2024
1 parent c9f3c0c commit efb9950
Showing 1 changed file with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,14 @@ public class SlidingWindowService {
*/
private Block currentBlock;

/**
* The thread pool for write operations.
*/
private ExecutorService ioExecutor;
/**
* The scheduler for polling blocks and sending them to @{@link #ioExecutor}.
*/
private ScheduledExecutorService pollBlockScheduler;

/**
* The last time when a batch of blocks is written to the disk.
Expand All @@ -121,7 +128,7 @@ public void start(AtomicLong windowMaxLength, long windowStartOffset) {
this.windowCoreData = new WindowCoreData(windowMaxLength, windowStartOffset, windowStartOffset);
this.ioExecutor = Threads.newFixedThreadPoolWithMonitor(ioThreadNums,
"block-wal-io-thread", false, LOGGER);
ScheduledExecutorService pollBlockScheduler = Threads.newSingleThreadScheduledExecutor(
this.pollBlockScheduler = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("wal-poll-block-thread-%d", false), LOGGER);
pollBlockScheduler.scheduleAtFixedRate(this::tryWriteBlock, 0, minWriteIntervalNanos, TimeUnit.NANOSECONDS);
initialized.set(true);
Expand All @@ -138,6 +145,7 @@ public boolean shutdown(long timeout, TimeUnit unit) {

boolean gracefulShutdown;
this.ioExecutor.shutdown();
this.pollBlockScheduler.shutdownNow();
try {
gracefulShutdown = this.ioExecutor.awaitTermination(timeout, unit);
} catch (InterruptedException e) {
Expand Down

0 comments on commit efb9950

Please sign in to comment.