From 0b62e5f83254dc212cdddfe1384c6ae60d1bbb61 Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Sat, 17 Aug 2024 22:04:57 +0800 Subject: [PATCH] fix(s3stream/wal): fail all IO operations once the WAL is failed (#1840) * refactor: remove `WALChannel#writeAndFlush` Signed-off-by: Ning Yu * refactor: introduce `WALChannel#retry` Signed-off-by: Ning Yu * refactor: introduce `AbstractWALChannel` Signed-off-by: Ning Yu * fix(s3stream/wal): fail all IO operations once the WAL is failed Signed-off-by: Ning Yu * refactor: check failed before each IO operation Signed-off-by: Ning Yu --------- Signed-off-by: Ning Yu --- .../s3/wal/impl/block/BlockWALService.java | 3 +- .../s3/wal/util/AbstractWALChannel.java | 121 ++++++++++++++++++ .../s3/wal/util/WALBlockDeviceChannel.java | 8 +- .../stream/s3/wal/util/WALCachedChannel.java | 36 +++++- .../automq/stream/s3/wal/util/WALChannel.java | 101 +-------------- .../stream/s3/wal/util/WALFileChannel.java | 8 +- .../wal/impl/block/BlockWALServiceTest.java | 15 ++- .../wal/util/WALBlockDeviceChannelTest.java | 21 +-- .../stream/s3/wal/util/WALChannelTest.java | 6 +- 9 files changed, 196 insertions(+), 123 deletions(-) create mode 100644 s3stream/src/main/java/com/automq/stream/s3/wal/util/AbstractWALChannel.java diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java index 5720d8ece4..0d1ce2d992 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java @@ -172,7 +172,8 @@ private synchronized void flushWALHeader() throws IOException { walHeader.setLastWriteTimestamp(System.nanoTime()); long trimOffset = walHeader.getTrimOffset(); ByteBuf buf = walHeader.marshal(); - this.walChannel.retryWriteAndFlush(buf, position); + this.walChannel.retryWrite(buf, position); + this.walChannel.retryFlush(); buf.release(); walHeader.updateFlushedTrimOffset(trimOffset); LOGGER.debug("WAL header flushed, position: {}, header: {}", position, walHeader); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/AbstractWALChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/AbstractWALChannel.java new file mode 100644 index 0000000000..3b6f278c30 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/AbstractWALChannel.java @@ -0,0 +1,121 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.wal.util; + +import com.automq.stream.utils.Threads; +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractWALChannel implements WALChannel { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWALChannel.class); + + /** + * Flag to indicate if the WAL has failed. + * It will be set to true if an IO operation fails continuously, and it will never be reset. + * Any IO operation will fail immediately if this flag is true. + */ + private volatile boolean failed = false; + + @Override + public void write(ByteBuf src, long position) throws IOException { + checkFailed(); + doWrite(src, position); + } + + @Override + public void retryWrite(ByteBuf src, long position, long retryIntervalMillis, + long retryTimeoutMillis) throws IOException { + checkFailed(); + retry(() -> write(src, position), retryIntervalMillis, retryTimeoutMillis); + } + + @Override + public void flush() throws IOException { + checkFailed(); + doFlush(); + } + + @Override + public void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException { + checkFailed(); + retry(this::flush, retryIntervalMillis, retryTimeoutMillis); + } + + @Override + public int read(ByteBuf dst, long position, int length) throws IOException { + checkFailed(); + return doRead(dst, position, length); + } + + @Override + public int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis, + long retryTimeoutMillis) throws IOException { + checkFailed(); + return retry(() -> read(dst, position, length), retryIntervalMillis, retryTimeoutMillis); + } + + private void retry(IORunnable runnable, long retryIntervalMillis, long retryTimeoutMillis) throws IOException { + retry(IOSupplier.from(runnable), retryIntervalMillis, retryTimeoutMillis); + } + + private T retry(IOSupplier supplier, long retryIntervalMillis, long retryTimeoutMillis) throws IOException { + long start = System.nanoTime(); + long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis); + while (true) { + try { + return supplier.get(); + } catch (IOException e) { + if (System.nanoTime() - start > retryTimeoutNanos) { + failed = true; + LOGGER.error("Failed to execute IO operation, retry timeout", e); + throw e; + } + checkFailed(); + LOGGER.warn("Failed to execute IO operation, retrying in {}ms, error: {}", retryIntervalMillis, e.getMessage()); + Threads.sleep(retryIntervalMillis); + } + } + } + + private void checkFailed() throws IOException { + if (failed) { + IOException e = new IOException("Failed to execute IO operation, WAL failed"); + LOGGER.error("Failed to execute IO operation, WAL failed", e); + throw e; + } + } + + protected abstract void doWrite(ByteBuf src, long position) throws IOException; + + protected abstract void doFlush() throws IOException; + + protected abstract int doRead(ByteBuf dst, long position, int length) throws IOException; + + private interface IOSupplier { + T get() throws IOException; + + static IOSupplier from(IORunnable runnable) { + return () -> { + runnable.run(); + return null; + }; + } + } + + private interface IORunnable { + void run() throws IOException; + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java index 4869772cd6..d09a074e6a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java @@ -28,7 +28,7 @@ import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET; import static com.automq.stream.s3.wal.util.WALUtil.isBlockDevice; -public class WALBlockDeviceChannel implements WALChannel { +public class WALBlockDeviceChannel extends AbstractWALChannel { private static final Logger LOGGER = LoggerFactory.getLogger(WALBlockDeviceChannel.class); private static final String CHECK_DIRECT_IO_AVAILABLE_FORMAT = "%s.check_direct_io_available"; final String path; @@ -237,7 +237,7 @@ private ByteBuffer getBuffer(int alignedSize) { } @Override - public void write(ByteBuf src, long position) throws IOException { + public void doWrite(ByteBuf src, long position) throws IOException { if (unalignedWrite) { // unaligned write, just used for testing unalignedWrite(src, position); @@ -295,11 +295,11 @@ private int write(ByteBuffer src, long position) throws IOException { } @Override - public void flush() { + public void doFlush() { } @Override - public int read(ByteBuf dst, long position, int length) throws IOException { + public int doRead(ByteBuf dst, long position, int length) throws IOException { long start = position; length = Math.min(length, dst.writableBytes()); long end = position + length; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java index eff476b8a4..8390c2b49c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java @@ -43,14 +43,25 @@ public static WALCachedChannel of(WALChannel channel, int cacheSize) { return new WALCachedChannel(channel, cacheSize); } + @Override + public int read(ByteBuf dst, long position, int length) throws IOException { + return read(channel::read, dst, position, length); + } + + @Override + public int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis, + long retryTimeoutMillis) throws IOException { + Reader reader = (buf, pos, len) -> channel.retryRead(buf, pos, len, retryIntervalMillis, retryTimeoutMillis); + return read(reader, dst, position, length); + } + /** * As we use a common cache for all threads, we need to synchronize the read. */ - @Override - public synchronized int read(ByteBuf dst, long position, int length) throws IOException { + private synchronized int read(Reader reader, ByteBuf dst, long position, int length) throws IOException { if (CAPACITY_NOT_SET == channel.capacity()) { // If we don't know the capacity now, we can't cache. - return channel.read(dst, position, length); + return reader.read(dst, position, length); } long start = position; @@ -60,7 +71,7 @@ public synchronized int read(ByteBuf dst, long position, int length) throws IOEx ByteBuf cache = getCache(); if (length > cache.capacity()) { // If the length is larger than the cache capacity, we can't cache. - return channel.read(dst, position, length); + return reader.read(dst, position, length); } boolean fallWithinCache = cachePosition >= 0 && cachePosition <= start && end <= cachePosition + cache.readableBytes(); @@ -69,7 +80,7 @@ public synchronized int read(ByteBuf dst, long position, int length) throws IOEx cachePosition = start; // Make sure the cache is not larger than the channel capacity. int cacheLength = (int) Math.min(cache.writableBytes(), channel.capacity() - cachePosition); - channel.read(cache, cachePosition, cacheLength); + reader.read(cache, cachePosition, cacheLength); } // Now the cache is ready. @@ -107,6 +118,10 @@ private ByteBuf getCache() { return this.cache; } + private interface Reader { + int read(ByteBuf dst, long position, int length) throws IOException; + } + @Override public void open(CapacityReader reader) throws IOException { this.channel.open(reader); @@ -127,11 +142,22 @@ public void write(ByteBuf src, long position) throws IOException { this.channel.write(src, position); } + @Override + public void retryWrite(ByteBuf src, long position, long retryIntervalMillis, + long retryTimeoutMillis) throws IOException { + channel.retryWrite(src, position, retryIntervalMillis, retryTimeoutMillis); + } + @Override public void flush() throws IOException { this.channel.flush(); } + @Override + public void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException { + channel.retryFlush(retryIntervalMillis, retryTimeoutMillis); + } + @Override public boolean useDirectIO() { return channel.useDirectIO(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java index a4ab990784..0fe533773f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java @@ -13,7 +13,6 @@ import com.automq.stream.s3.wal.exception.WALCapacityMismatchException; import com.automq.stream.s3.wal.exception.WALNotInitializedException; -import com.automq.stream.utils.Threads; import io.netty.buffer.ByteBuf; import java.io.File; import java.io.IOException; @@ -31,8 +30,6 @@ */ public interface WALChannel { - Logger LOGGER = LoggerFactory.getLogger(WALChannel.class); - long DEFAULT_RETRY_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100); long DEFAULT_RETRY_TIMEOUT = TimeUnit.MINUTES.toMillis(1); @@ -77,24 +74,7 @@ default void retryWrite(ByteBuf src, long position) throws IOException { /** * Retry {@link #write(ByteBuf, long)} with the given interval until success or timeout. */ - default void retryWrite(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException { - long start = System.nanoTime(); - long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis); - while (true) { - try { - write(src, position); - break; - } catch (IOException e) { - if (System.nanoTime() - start > retryTimeoutNanos) { - LOGGER.error("Failed to write, retry timeout", e); - throw e; - } else { - LOGGER.error("Failed to write, retrying in {}ms", retryIntervalMillis, e); - Threads.sleep(retryIntervalMillis); - } - } - } - } + void retryWrite(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException; /** * Flush to disk. @@ -108,57 +88,10 @@ default void retryFlush() throws IOException { /** * Retry {@link #flush()} with the given interval until success or timeout. */ - default void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException { - long start = System.nanoTime(); - long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis); - while (true) { - try { - flush(); - break; - } catch (IOException e) { - if (System.nanoTime() - start > retryTimeoutNanos) { - LOGGER.error("Failed to flush, retry timeout", e); - throw e; - } else { - LOGGER.error("Failed to flush, retrying in {}ms", retryIntervalMillis, e); - Threads.sleep(retryIntervalMillis); - } - } - } - } - - /** - * Call {@link #write(ByteBuf, long)} and {@link #flush()}. - */ - default void writeAndFlush(ByteBuf src, long position) throws IOException { - write(src, position); - flush(); - } - - default void retryWriteAndFlush(ByteBuf src, long position) throws IOException { - retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT); - } + void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException; - /** - * Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success or timeout. - */ - default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException { - long start = System.nanoTime(); - long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis); - while (true) { - try { - writeAndFlush(src, position); - break; - } catch (IOException e) { - if (System.nanoTime() - start > retryTimeoutNanos) { - LOGGER.error("Failed to write and flush, retry timeout", e); - throw e; - } else { - LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e); - Threads.sleep(retryIntervalMillis); - } - } - } + default int read(ByteBuf dst, long position) throws IOException { + return read(dst, position, dst.writableBytes()); } /** @@ -171,34 +104,14 @@ default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMi */ int read(ByteBuf dst, long position, int length) throws IOException; - default int read(ByteBuf dst, long position) throws IOException { - return read(dst, position, dst.writableBytes()); - } - default int retryRead(ByteBuf dst, long position) throws IOException { - return retryRead(dst, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT); + return retryRead(dst, position, dst.writableBytes(), DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT); } /** - * Retry {@link #read(ByteBuf, long)} with the given interval until success or timeout. + * Retry {@link #read(ByteBuf, long, int)} with the given interval until success or timeout. */ - default int retryRead(ByteBuf dst, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException { - long start = System.nanoTime(); - long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis); - while (true) { - try { - return read(dst, position); - } catch (IOException e) { - if (System.nanoTime() - start > retryTimeoutNanos) { - LOGGER.error("Failed to read, retry timeout", e); - throw e; - } else { - LOGGER.error("Failed to read, retrying in {}ms", retryIntervalMillis, e); - Threads.sleep(retryIntervalMillis); - } - } - } - } + int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis, long retryTimeoutMillis) throws IOException; boolean useDirectIO(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java index 431b797e3d..4fce97977c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java @@ -22,7 +22,7 @@ import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET; -public class WALFileChannel implements WALChannel { +public class WALFileChannel extends AbstractWALChannel { final String filePath; final long fileCapacityWant; /** @@ -107,7 +107,7 @@ public String path() { } @Override - public void write(ByteBuf src, long position) throws IOException { + public void doWrite(ByteBuf src, long position) throws IOException { assert src.readableBytes() + position <= capacity(); ByteBuffer[] nioBuffers = src.nioBuffers(); for (ByteBuffer nioBuffer : nioBuffers) { @@ -117,12 +117,12 @@ public void write(ByteBuf src, long position) throws IOException { } @Override - public void flush() throws IOException { + public void doFlush() throws IOException { fileChannel.force(false); } @Override - public int read(ByteBuf dst, long position, int length) throws IOException { + public int doRead(ByteBuf dst, long position, int length) throws IOException { length = Math.min(length, dst.writableBytes()); assert position + length <= capacity(); int bytesRead = 0; diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/impl/block/BlockWALServiceTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/impl/block/BlockWALServiceTest.java index 906f13745d..c47336781d 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/impl/block/BlockWALServiceTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/impl/block/BlockWALServiceTest.java @@ -16,12 +16,12 @@ import com.automq.stream.s3.wal.AppendResult; import com.automq.stream.s3.wal.RecoverResult; import com.automq.stream.s3.wal.WriteAheadLog; +import com.automq.stream.s3.wal.benchmark.WriteBench; import com.automq.stream.s3.wal.common.RecordHeader; import com.automq.stream.s3.wal.exception.OverCapacityException; import com.automq.stream.s3.wal.exception.WALCapacityMismatchException; import com.automq.stream.s3.wal.exception.WALNotInitializedException; import com.automq.stream.s3.wal.impl.block.BlockWALService.RecoverIterator; -import com.automq.stream.s3.wal.benchmark.WriteBench; import com.automq.stream.s3.wal.util.WALBlockDeviceChannel; import com.automq.stream.s3.wal.util.WALChannel; import com.automq.stream.s3.wal.util.WALUtil; @@ -914,14 +914,14 @@ private void write(WALChannel walChannel, long logicOffset, int recordSize) thro record.addComponents(true, recordHeader, recordBody); long position = WALUtil.recordOffsetToPosition(logicOffset, walChannel.capacity(), WAL_HEADER_TOTAL_CAPACITY); - walChannel.writeAndFlush(record, position); + writeAndFlush(walChannel, record, position); } private void writeWALHeader(WALChannel walChannel, long trimOffset, long maxLength) throws IOException { ByteBuf header = new BlockWALHeader(walChannel.capacity(), maxLength) .updateTrimOffset(trimOffset) .marshal(); - walChannel.writeAndFlush(header, 0); + writeAndFlush(walChannel, header, 0); } @ParameterizedTest(name = "Test {index} {0}") @@ -1214,7 +1214,7 @@ private void testCapacityMismatchInHeader0(boolean directIO) throws IOException .direct(directIO) .build(); walChannel.open(); - walChannel.writeAndFlush(new BlockWALHeader(capacity2, 42).marshal(), 0); + writeAndFlush(walChannel, new BlockWALHeader(capacity2, 42).marshal(), 0); walChannel.close(); // try to open it with capacity1 @@ -1278,7 +1278,7 @@ private void testRecoveryModeNoHeader0(boolean directIO) throws IOException { .direct(directIO) .build(); walChannel.open(); - walChannel.writeAndFlush(Unpooled.buffer(WAL_HEADER_TOTAL_CAPACITY).writeZero(WAL_HEADER_TOTAL_CAPACITY), 0); + writeAndFlush(walChannel, Unpooled.buffer(WAL_HEADER_TOTAL_CAPACITY).writeZero(WAL_HEADER_TOTAL_CAPACITY), 0); walChannel.close(); // try to open it in recovery mode @@ -1344,6 +1344,11 @@ private void testResetWithoutRecover0(int recordCount, boolean directIO) throws } } + private void writeAndFlush(WALChannel channel, ByteBuf src, long position) throws IOException { + channel.write(src, position); + channel.flush(); + } + private static class RecoverFromDisasterParam { int recordSize; long capacity; diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java index 50c8e8a62b..262ac33f2b 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java @@ -54,7 +54,7 @@ public void testSingleThreadWriteBasic() throws IOException { for (int i = 0; i < count; i++) { ByteBuf data = TestUtils.random(size); long pos = WALUtil.alignLargeByBlockSize(size) * i; - channel.writeAndFlush(data, pos); + writeAndFlush(channel, data, pos); } channel.close(); @@ -77,7 +77,7 @@ public void testSingleThreadWriteComposite() throws IOException { data.addComponent(true, TestUtils.random(size)); } long pos = WALUtil.alignLargeByBlockSize(maxSize) * i; - channel.writeAndFlush(data, pos); + writeAndFlush(channel, data, pos); } channel.close(); @@ -101,7 +101,7 @@ public void testMultiThreadWrite() throws IOException, InterruptedException { ByteBuf data = TestUtils.random(size); long pos = WALUtil.alignLargeByBlockSize(size) * index; try { - channel.writeAndFlush(data, pos); + writeAndFlush(channel, data, pos); } catch (IOException e) { throw new RuntimeException(e); } @@ -120,7 +120,7 @@ public void testWriteNotAlignedBufferSize() throws IOException { ByteBuf data = TestUtils.random(42); // It's ok to do this - assertDoesNotThrow(() -> channel.writeAndFlush(data, 0)); + assertDoesNotThrow(() -> writeAndFlush(channel, data, 0)); channel.close(); } @@ -131,7 +131,7 @@ public void testWriteNotAlignedPosition() throws IOException { channel.open(); ByteBuf data = TestUtils.random(4096); - assertThrows(AssertionError.class, () -> channel.writeAndFlush(data, 42)); + assertThrows(AssertionError.class, () -> writeAndFlush(channel, data, 42)); channel.close(); } @@ -142,7 +142,7 @@ public void testWriteOutOfBound() throws IOException { channel.open(); ByteBuf data = TestUtils.random(4096); - assertThrows(AssertionError.class, () -> channel.writeAndFlush(data, 8192)); + assertThrows(AssertionError.class, () -> writeAndFlush(channel, data, 8192)); channel.close(); } @@ -163,7 +163,7 @@ public void testReadBasic() throws IOException { ByteBuf data = TestUtils.random(size); long pos = ThreadLocalRandom.current().nextLong(0, capacity - size); pos = WALUtil.alignSmallByBlockSize(pos); - wChannel.writeAndFlush(data, pos); + writeAndFlush(wChannel, data, pos); ByteBuf buf = Unpooled.buffer(size); int read = rChannel.read(buf, pos); @@ -191,7 +191,7 @@ public void testReadInside() throws IOException { ByteBuf data = TestUtils.random(size); long pos = ThreadLocalRandom.current().nextLong(0, capacity - size); pos = WALUtil.alignSmallByBlockSize(pos); - wChannel.writeAndFlush(data, pos); + writeAndFlush(wChannel, data, pos); int start = ThreadLocalRandom.current().nextInt(0, size - 1); int end = ThreadLocalRandom.current().nextInt(start + 1, size); @@ -228,4 +228,9 @@ public void testReadNotAlignedPosition() throws IOException { channel.close(); } + + private void writeAndFlush(WALChannel channel, ByteBuf src, long position) throws IOException { + channel.write(src, position); + channel.flush(); + } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALChannelTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALChannelTest.java index a8be74cb62..627fce5575 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALChannelTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALChannelTest.java @@ -57,14 +57,16 @@ void testWriteAndRead() throws IOException { ByteBuf data = TestUtils.random(1024 * 3); for (int i = 0; i < 100; i++) { try { - walChannel.writeAndFlush(data, (long) i * data.readableBytes()); + walChannel.write(data, (long) i * data.readableBytes()); + walChannel.flush(); } catch (IOException e) { throw new RuntimeException(e); } } final String content = "Hello World"; - walChannel.writeAndFlush(Unpooled.wrappedBuffer(content.getBytes()), 100); + walChannel.write(Unpooled.wrappedBuffer(content.getBytes()), 100); + walChannel.flush(); ByteBuf readBuffer = Unpooled.buffer(content.length()); int read = walChannel.read(readBuffer, 100);