From ad0a9a598969ccacab8cd2f204f9a8fd8ca25043 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Mon, 19 Feb 2024 17:59:27 +0800 Subject: [PATCH 1/3] fix: handle exceptions thrown from the wal Signed-off-by: Ning Yu --- .../src/main/java/com/automq/stream/s3/S3Storage.java | 9 ++++++++- .../main/java/com/automq/stream/s3/WalWriteRequest.java | 9 +++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 9c2d544f5..c577064b4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -394,7 +394,14 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f request.cf.completeExceptionally(e); return false; } - appendResult.future().thenAccept(nil -> handleAppendCallback(request)); + appendResult.future().whenComplete((nil, ex) -> { + if (ex != null) { + // no exception should be thrown from the WAL + LOGGER.error("[UNEXPECTED] append WAL fail, request {}", request, ex); + return; + } + handleAppendCallback(request); + }); return false; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java b/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java index 62b986231..01919c66f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java +++ b/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java @@ -37,4 +37,13 @@ public WalWriteRequest(StreamRecordBatch record, long offset, CompletableFuture< public int compareTo(WalWriteRequest o) { return record.compareTo(o.record); } + + @Override + public String toString() { + return "WalWriteRequest{" + + "record=" + record + + ", offset=" + offset + + ", persisted=" + persisted + + '}'; + } } From 007a0fee390146cf22ba909a97806134a01bbddb Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Mon, 19 Feb 2024 20:14:44 +0800 Subject: [PATCH 2/3] feat: retry when IOException Signed-off-by: Ning Yu --- .../automq/stream/s3/wal/util/WALChannel.java | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java index 1f47d4c07..c8818f679 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java @@ -13,6 +13,7 @@ import com.automq.stream.s3.wal.WALCapacityMismatchException; import com.automq.stream.s3.wal.WALNotInitializedException; +import com.automq.stream.utils.Threads; import io.netty.buffer.ByteBuf; import java.io.IOException; import org.slf4j.Logger; @@ -27,7 +28,10 @@ */ public interface WALChannel { + Logger LOGGER = LoggerFactory.getLogger(WALChannel.class); + String DEVICE_PREFIX = "/dev/"; + long DEFAULT_RETRY_INTERVAL = 100L; static WALChannelBuilder builder(String path) { return new WALChannelBuilder(path); @@ -63,11 +67,49 @@ default void open() throws IOException { */ void write(ByteBuf src, long position) throws IOException; + default void retryWrite(ByteBuf src, long position) { + retryWrite(src, position, DEFAULT_RETRY_INTERVAL); + } + + /** + * Retry {@link #write(ByteBuf, long)} with the given interval until success. + */ + default void retryWrite(ByteBuf src, long position, long retryIntervalMillis) { + while (true) { + try { + write(src, position); + break; + } catch (IOException e) { + LOGGER.error("Failed to write, retrying in {}ms", retryIntervalMillis, e); + Threads.sleep(retryIntervalMillis); + } + } + } + /** * Flush to disk. */ void flush() throws IOException; + default void retryFlush() { + retryFlush(DEFAULT_RETRY_INTERVAL); + } + + /** + * Retry {@link #flush()} with the given interval until success. + */ + default void retryFlush(long retryIntervalMillis) { + while (true) { + try { + flush(); + break; + } catch (IOException e) { + LOGGER.error("Failed to flush, retrying in {}ms", retryIntervalMillis, e); + Threads.sleep(retryIntervalMillis); + } + } + } + /** * Call {@link #write(ByteBuf, long)} and {@link #flush()}. */ @@ -76,6 +118,25 @@ default void writeAndFlush(ByteBuf src, long position) throws IOException { flush(); } + default void retryWriteAndFlush(ByteBuf src, long position) { + retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL); + } + + /** + * Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success. + */ + default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis) { + while (true) { + try { + writeAndFlush(src, position); + break; + } catch (IOException e) { + LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e); + Threads.sleep(retryIntervalMillis); + } + } + } + /** * Read bytes from the given position of the channel to the given buffer from the current writer index * until reaching the capacity of the buffer or the end of the channel. @@ -86,6 +147,24 @@ default int read(ByteBuf dst, long position) throws IOException { return read(dst, position, dst.writableBytes()); } + default int retryRead(ByteBuf dst, long position) { + return retryRead(dst, position, DEFAULT_RETRY_INTERVAL); + } + + /** + * Retry {@link #read(ByteBuf, long)} with the given interval until success. + */ + default int retryRead(ByteBuf dst, long position, long retryIntervalMillis) { + while (true) { + try { + return read(dst, position); + } catch (IOException e) { + LOGGER.error("Failed to read, retrying in {}ms", retryIntervalMillis, e); + Threads.sleep(retryIntervalMillis); + } + } + } + /** * Read bytes from the given position of the channel to the given buffer from the current writer index * until reaching the given length or the end of the channel. @@ -96,6 +175,24 @@ default int read(ByteBuf dst, long position) throws IOException { */ int read(ByteBuf dst, long position, int length) throws IOException; + default int retryRead(ByteBuf dst, long position, int length) { + return retryRead(dst, position, length, DEFAULT_RETRY_INTERVAL); + } + + /** + * Retry {@link #read(ByteBuf, long, int)} with the given interval until success. + */ + default int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis) { + while (true) { + try { + return read(dst, position, length); + } catch (IOException e) { + LOGGER.error("Failed to read, retrying in {}ms", retryIntervalMillis, e); + Threads.sleep(retryIntervalMillis); + } + } + } + boolean useDirectIO(); interface CapacityReader { From 2e1e14b9d37e0a0613e8cc178530b890b2c26ab6 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Mon, 19 Feb 2024 20:21:38 +0800 Subject: [PATCH 3/3] refactor: retry to read/write/flush when IOException occurs Signed-off-by: Ning Yu --- .../automq/stream/s3/wal/BlockWALService.java | 80 ++++++------------- .../stream/s3/wal/SlidingWindowService.java | 60 +++++++------- 2 files changed, 54 insertions(+), 86 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 91490cfba..5b3d90399 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -33,7 +33,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -125,6 +124,9 @@ public class BlockWALService implements WriteAheadLog { */ private long recoveryCompleteOffset = -1; + private BlockWALService() { + } + public static BlockWALServiceBuilder builder(String path, long capacity) { return new BlockWALServiceBuilder(path, capacity); } @@ -133,28 +135,20 @@ public static BlockWALServiceBuilder recoveryBuilder(String path) { return new BlockWALServiceBuilder(path); } - private BlockWALService() { - } - - private void flushWALHeader(ShutdownType shutdownType) throws IOException { + private void flushWALHeader(ShutdownType shutdownType) { walHeader.setShutdownType(shutdownType); flushWALHeader(); } - private synchronized void flushWALHeader() throws IOException { + private synchronized void flushWALHeader() { long position = writeHeaderRoundTimes.getAndIncrement() % WAL_HEADER_COUNT * WAL_HEADER_CAPACITY; - try { - walHeader.setLastWriteTimestamp(System.nanoTime()); - long trimOffset = walHeader.getTrimOffset(); - ByteBuf buf = walHeader.marshal(); - this.walChannel.writeAndFlush(buf, position); - buf.release(); - walHeader.updateFlushedTrimOffset(trimOffset); - LOGGER.debug("WAL header flushed, position: {}, header: {}", position, walHeader); - } catch (IOException e) { - LOGGER.error("failed to flush WAL header, position: {}, header: {}", position, walHeader, e); - throw e; - } + walHeader.setLastWriteTimestamp(System.nanoTime()); + long trimOffset = walHeader.getTrimOffset(); + ByteBuf buf = walHeader.marshal(); + this.walChannel.retryWriteAndFlush(buf, position); + buf.release(); + walHeader.updateFlushedTrimOffset(trimOffset); + LOGGER.debug("WAL header flushed, position: {}, header: {}", position, walHeader); } /** @@ -188,19 +182,11 @@ private ByteBuf readRecord(long recoverStartOffset, private SlidingWindowService.RecordHeaderCoreData parseRecordHeader(long recoverStartOffset, ByteBuf recordHeader, Function logicalToPhysical) throws ReadRecordException { final long position = logicalToPhysical.apply(recoverStartOffset); - try { - int read = walChannel.read(recordHeader, position); - if (read != RECORD_HEADER_SIZE) { - throw new ReadRecordException( - WALUtil.alignNextBlock(recoverStartOffset), - String.format("failed to read record header: expected %d bytes, actual %d bytes, recoverStartOffset: %d", RECORD_HEADER_SIZE, read, recoverStartOffset) - ); - } - } catch (IOException e) { - LOGGER.error("failed to read record header, position: {}, recoverStartOffset: {}", position, recoverStartOffset, e); + int read = walChannel.retryRead(recordHeader, position); + if (read != RECORD_HEADER_SIZE) { throw new ReadRecordException( WALUtil.alignNextBlock(recoverStartOffset), - String.format("failed to read record header, recoverStartOffset: %d", recoverStartOffset) + String.format("failed to read record header: expected %d bytes, actual %d bytes, recoverStartOffset: %d", RECORD_HEADER_SIZE, read, recoverStartOffset) ); } @@ -243,20 +229,12 @@ private void parseRecordBody(long recoverStartOffset, SlidingWindowService.Recor ByteBuf recordBody, Function logicalToPhysical) throws ReadRecordException { long recordBodyOffset = readRecordHeader.getRecordBodyOffset(); int recordBodyLength = readRecordHeader.getRecordBodyLength(); - try { - long position = logicalToPhysical.apply(recordBodyOffset); - int read = walChannel.read(recordBody, position); - if (read != recordBodyLength) { - throw new ReadRecordException( - WALUtil.alignNextBlock(recoverStartOffset + RECORD_HEADER_SIZE + recordBodyLength), - String.format("failed to read record body: expected %d bytes, actual %d bytes, recoverStartOffset: %d", recordBodyLength, read, recoverStartOffset) - ); - } - } catch (IOException e) { - LOGGER.error("failed to read record body, position: {}, recoverStartOffset: {}", recordBodyOffset, recoverStartOffset, e); + long position = logicalToPhysical.apply(recordBodyOffset); + int read = walChannel.retryRead(recordBody, position); + if (read != recordBodyLength) { throw new ReadRecordException( WALUtil.alignNextBlock(recoverStartOffset + RECORD_HEADER_SIZE + recordBodyLength), - String.format("failed to read record body, recoverStartOffset: %d", recoverStartOffset) + String.format("failed to read record body: expected %d bytes, actual %d bytes, recoverStartOffset: %d", recordBodyLength, read, recoverStartOffset) ); } @@ -324,7 +302,7 @@ private WALHeader tryReadWALHeader(WALChannel walChannel) { for (int i = 0; i < WAL_HEADER_COUNT; i++) { ByteBuf buf = DirectByteBufAlloc.byteBuffer(WALHeader.WAL_HEADER_SIZE); try { - int read = walChannel.read(buf, i * WAL_HEADER_CAPACITY); + int read = walChannel.retryRead(buf, i * WAL_HEADER_CAPACITY); if (read != WALHeader.WAL_HEADER_SIZE) { continue; } @@ -332,7 +310,7 @@ private WALHeader tryReadWALHeader(WALChannel walChannel) { if (header == null || header.getLastWriteTimestamp() < tmpHeader.getLastWriteTimestamp()) { header = tmpHeader; } - } catch (IOException | UnmarshalException ignored) { + } catch (UnmarshalException ignored) { // failed to parse WALHeader, ignore } finally { buf.release(); @@ -345,7 +323,7 @@ private WALHeader newWALHeader() { return new WALHeader(walChannel.capacity(), initialWindowSize); } - private void walHeaderReady(WALHeader header) throws IOException { + private void walHeaderReady(WALHeader header) { if (nodeId != NOOP_NODE_ID) { header.setNodeId(nodeId); header.setEpoch(epoch); @@ -374,11 +352,7 @@ public void shutdownGracefully() { boolean gracefulShutdown = Optional.ofNullable(slidingWindowService) .map(s -> s.shutdown(1, TimeUnit.DAYS)) .orElse(true); - try { - flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY); - } catch (IOException e) { - LOGGER.error("failed to flush WALHeader when shutdown gracefully", e); - } + flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY); walChannel.close(); @@ -507,13 +481,7 @@ private CompletableFuture trim(long offset, boolean internal) { } walHeader.updateTrimOffset(offset); - return CompletableFuture.runAsync(() -> { - try { - flushWALHeader(); - } catch (IOException e) { - throw new CompletionException(e); - } - }, walHeaderFlusher); + return CompletableFuture.runAsync(this::flushWALHeader, walHeaderFlusher); } private void checkStarted() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java index 881a87152..0cd549ec2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java @@ -21,7 +21,6 @@ import com.automq.stream.utils.ThreadUtils; import com.automq.stream.utils.Threads; import io.netty.buffer.ByteBuf; -import java.io.IOException; import java.util.Collection; import java.util.LinkedList; import java.util.PriorityQueue; @@ -348,17 +347,17 @@ private long wroteBlocksLocked(BlockBatch wroteBlocks) { return writingBlocks.peek(); } - private void writeBlockData(BlockBatch blocks) throws IOException { + private void writeBlockData(BlockBatch blocks) { TimerUtil timer = new TimerUtil(); for (Block block : blocks.blocks()) { long position = WALUtil.recordOffsetToPosition(block.startOffset(), walChannel.capacity(), WAL_HEADER_TOTAL_CAPACITY); - walChannel.write(block.data(), position); + walChannel.retryWrite(block.data(), position); } - walChannel.flush(); + walChannel.retryFlush(); StorageOperationStats.getInstance().appendWALWriteStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); } - private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException { + private void makeWriteOffsetMatchWindow(long newWindowEndOffset) { // align to block size newWindowEndOffset = WALUtil.alignLargeByBlockSize(newWindowEndOffset); long windowStartOffset = windowCoreData.getStartOffset(); @@ -372,7 +371,7 @@ private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOExcept } public interface WALHeaderFlusher { - void flush() throws IOException; + void flush(); } public static class RecordHeaderCoreData { @@ -500,7 +499,7 @@ public void updateWindowStartOffset(long offset) { this.startOffset.accumulateAndGet(offset, Math::max); } - public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) throws IOException { + public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) { boolean scaleWindowHappened = false; scaleOutLock.lock(); try { @@ -535,36 +534,37 @@ public WriteBlockProcessor(BlockBatch blocks) { @Override public void run() { StorageOperationStats.getInstance().appendWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); - writeBlock(this.blocks); - } - - private void writeBlock(BlockBatch blocks) { try { - makeWriteOffsetMatchWindow(blocks.endOffset()); - writeBlockData(blocks); - - TimerUtil timer = new TimerUtil(); - // Update the start offset of the sliding window after finishing writing the record. - windowCoreData.updateWindowStartOffset(wroteBlocks(blocks)); - - FutureUtil.complete(blocks.futures(), new AppendResult.CallbackResult() { - @Override - public long flushedOffset() { - return windowCoreData.getStartOffset(); - } - - @Override - public String toString() { - return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}'; - } - }); - StorageOperationStats.getInstance().appendWALAfterStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); + writeBlock(this.blocks); } catch (Exception e) { + // should not happen, but just in case FutureUtil.completeExceptionally(blocks.futures(), e); LOGGER.error(String.format("failed to write blocks, startOffset: %s", blocks.startOffset()), e); } finally { blocks.release(); } } + + private void writeBlock(BlockBatch blocks) { + makeWriteOffsetMatchWindow(blocks.endOffset()); + writeBlockData(blocks); + + TimerUtil timer = new TimerUtil(); + // Update the start offset of the sliding window after finishing writing the record. + windowCoreData.updateWindowStartOffset(wroteBlocks(blocks)); + + FutureUtil.complete(blocks.futures(), new AppendResult.CallbackResult() { + @Override + public long flushedOffset() { + return windowCoreData.getStartOffset(); + } + + @Override + public String toString() { + return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}'; + } + }); + StorageOperationStats.getInstance().appendWALAfterStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); + } } }