diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java index f61f32c4a3..5720d8ece4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java @@ -162,12 +162,12 @@ public static BlockWALServiceBuilder recoveryBuilder(String path) { return new BlockWALServiceBuilder(path).recoveryMode(true); } - private void flushWALHeader(ShutdownType shutdownType) { + private void flushWALHeader(ShutdownType shutdownType) throws IOException { walHeader.setShutdownType(shutdownType); flushWALHeader(); } - private synchronized void flushWALHeader() { + private synchronized void flushWALHeader() throws IOException { long position = writeHeaderRoundTimes.getAndIncrement() % WAL_HEADER_COUNT * WAL_HEADER_CAPACITY; walHeader.setLastWriteTimestamp(System.nanoTime()); long trimOffset = walHeader.getTrimOffset(); @@ -363,7 +363,7 @@ private BlockWALHeader newWALHeader() { return new BlockWALHeader(walChannel.capacity(), initialWindowSize); } - private void walHeaderReady(BlockWALHeader header) { + private void walHeaderReady(BlockWALHeader header) throws IOException { if (nodeId != NOOP_NODE_ID) { header.setNodeId(nodeId); header.setEpoch(epoch); @@ -392,7 +392,11 @@ public void shutdownGracefully() { boolean gracefulShutdown = Optional.ofNullable(slidingWindowService) .map(s -> s.shutdown(1, TimeUnit.DAYS)) .orElse(true); - flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY); + try { + flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY); + } catch (IOException ignored) { + // shutdown anyway + } walChannel.close(); @@ -506,7 +510,13 @@ private CompletableFuture trim(long offset, boolean internal) { } walHeader.updateTrimOffset(offset); - return CompletableFuture.runAsync(this::flushWALHeader, walHeaderFlusher); + return CompletableFuture.runAsync(() -> { + try { + flushWALHeader(); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + }, walHeaderFlusher); } private void checkStarted() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java index 993d0165bf..2434c58ed5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java @@ -386,7 +386,7 @@ private void writeBlockData(BlockBatch blocks) throws IOException { StorageOperationStats.getInstance().appendWALWriteStats.record(TimerUtil.durationElapsedAs(start, TimeUnit.NANOSECONDS)); } - private void makeWriteOffsetMatchWindow(long newWindowEndOffset) { + private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException { // align to block size newWindowEndOffset = WALUtil.alignLargeByBlockSize(newWindowEndOffset); long windowStartOffset = windowCoreData.getStartOffset(); @@ -400,7 +400,7 @@ private void makeWriteOffsetMatchWindow(long newWindowEndOffset) { } public interface WALHeaderFlusher { - void flush(); + void flush() throws IOException; } public static class WindowCoreData { @@ -442,7 +442,7 @@ public void updateWindowStartOffset(long offset) { this.startOffset.accumulateAndGet(offset, Math::max); } - public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) { + public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) throws IOException { boolean scaleWindowHappened = false; scaleOutLock.lock(); try { diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java index 0348d32f91..a4ab990784 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java @@ -135,21 +135,28 @@ default void writeAndFlush(ByteBuf src, long position) throws IOException { flush(); } - default void retryWriteAndFlush(ByteBuf src, long position) { - retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL); + default void retryWriteAndFlush(ByteBuf src, long position) throws IOException { + retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT); } /** - * Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success. + * Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success or timeout. */ - default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis) { + default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException { + long start = System.nanoTime(); + long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis); while (true) { try { writeAndFlush(src, position); break; } catch (IOException e) { - LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e); - Threads.sleep(retryIntervalMillis); + if (System.nanoTime() - start > retryTimeoutNanos) { + LOGGER.error("Failed to write and flush, retry timeout", e); + throw e; + } else { + LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e); + Threads.sleep(retryIntervalMillis); + } } } }