Skip to content

Commit

Permalink
fix(s3stream/wal): fail all IO operations once the WAL is failed (#1841)
Browse files Browse the repository at this point in the history
fix(s3stream/wal): fail all IO operations once the WAL is failed (#1840)

* refactor: remove `WALChannel#writeAndFlush`



* refactor: introduce `WALChannel#retry`



* refactor: introduce `AbstractWALChannel`



* fix(s3stream/wal): fail all IO operations once the WAL is failed



* refactor: check failed before each IO operation



---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Aug 18, 2024
1 parent a3f3c9f commit 34ba11e
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ private synchronized void flushWALHeader() throws IOException {
walHeader.setLastWriteTimestamp(System.nanoTime());
long trimOffset = walHeader.getTrimOffset();
ByteBuf buf = walHeader.marshal();
this.walChannel.retryWriteAndFlush(buf, position);
this.walChannel.retryWrite(buf, position);
this.walChannel.retryFlush();
buf.release();
walHeader.updateFlushedTrimOffset(trimOffset);
LOGGER.debug("WAL header flushed, position: {}, header: {}", position, walHeader);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.wal.util;

import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWALChannel implements WALChannel {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWALChannel.class);

/**
* Flag to indicate if the WAL has failed.
* It will be set to true if an IO operation fails continuously, and it will never be reset.
* Any IO operation will fail immediately if this flag is true.
*/
private volatile boolean failed = false;

@Override
public void write(ByteBuf src, long position) throws IOException {
checkFailed();
doWrite(src, position);
}

@Override
public void retryWrite(ByteBuf src, long position, long retryIntervalMillis,
long retryTimeoutMillis) throws IOException {
checkFailed();
retry(() -> write(src, position), retryIntervalMillis, retryTimeoutMillis);
}

@Override
public void flush() throws IOException {
checkFailed();
doFlush();
}

@Override
public void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
checkFailed();
retry(this::flush, retryIntervalMillis, retryTimeoutMillis);
}

@Override
public int read(ByteBuf dst, long position, int length) throws IOException {
checkFailed();
return doRead(dst, position, length);
}

@Override
public int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis,
long retryTimeoutMillis) throws IOException {
checkFailed();
return retry(() -> read(dst, position, length), retryIntervalMillis, retryTimeoutMillis);
}

private void retry(IORunnable runnable, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
retry(IOSupplier.from(runnable), retryIntervalMillis, retryTimeoutMillis);
}

private <T> T retry(IOSupplier<T> supplier, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
return supplier.get();
} catch (IOException e) {
if (System.nanoTime() - start > retryTimeoutNanos) {
failed = true;
LOGGER.error("Failed to execute IO operation, retry timeout", e);
throw e;
}
checkFailed();
LOGGER.warn("Failed to execute IO operation, retrying in {}ms, error: {}", retryIntervalMillis, e.getMessage());
Threads.sleep(retryIntervalMillis);
}
}
}

private void checkFailed() throws IOException {
if (failed) {
IOException e = new IOException("Failed to execute IO operation, WAL failed");
LOGGER.error("Failed to execute IO operation, WAL failed", e);
throw e;
}
}

protected abstract void doWrite(ByteBuf src, long position) throws IOException;

protected abstract void doFlush() throws IOException;

protected abstract int doRead(ByteBuf dst, long position, int length) throws IOException;

private interface IOSupplier<T> {
T get() throws IOException;

static IOSupplier<Void> from(IORunnable runnable) {
return () -> {
runnable.run();
return null;
};
}
}

private interface IORunnable {
void run() throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET;
import static com.automq.stream.s3.wal.util.WALUtil.isBlockDevice;

public class WALBlockDeviceChannel implements WALChannel {
public class WALBlockDeviceChannel extends AbstractWALChannel {
private static final Logger LOGGER = LoggerFactory.getLogger(WALBlockDeviceChannel.class);
private static final String CHECK_DIRECT_IO_AVAILABLE_FORMAT = "%s.check_direct_io_available";
final String path;
Expand Down Expand Up @@ -237,7 +237,7 @@ private ByteBuffer getBuffer(int alignedSize) {
}

@Override
public void write(ByteBuf src, long position) throws IOException {
public void doWrite(ByteBuf src, long position) throws IOException {
if (unalignedWrite) {
// unaligned write, just used for testing
unalignedWrite(src, position);
Expand Down Expand Up @@ -295,11 +295,11 @@ private int write(ByteBuffer src, long position) throws IOException {
}

@Override
public void flush() {
public void doFlush() {
}

@Override
public int read(ByteBuf dst, long position, int length) throws IOException {
public int doRead(ByteBuf dst, long position, int length) throws IOException {
long start = position;
length = Math.min(length, dst.writableBytes());
long end = position + length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,25 @@ public static WALCachedChannel of(WALChannel channel, int cacheSize) {
return new WALCachedChannel(channel, cacheSize);
}

@Override
public int read(ByteBuf dst, long position, int length) throws IOException {
return read(channel::read, dst, position, length);
}

@Override
public int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis,
long retryTimeoutMillis) throws IOException {
Reader reader = (buf, pos, len) -> channel.retryRead(buf, pos, len, retryIntervalMillis, retryTimeoutMillis);
return read(reader, dst, position, length);
}

/**
* As we use a common cache for all threads, we need to synchronize the read.
*/
@Override
public synchronized int read(ByteBuf dst, long position, int length) throws IOException {
private synchronized int read(Reader reader, ByteBuf dst, long position, int length) throws IOException {
if (CAPACITY_NOT_SET == channel.capacity()) {
// If we don't know the capacity now, we can't cache.
return channel.read(dst, position, length);
return reader.read(dst, position, length);
}

long start = position;
Expand All @@ -60,7 +71,7 @@ public synchronized int read(ByteBuf dst, long position, int length) throws IOEx
ByteBuf cache = getCache();
if (length > cache.capacity()) {
// If the length is larger than the cache capacity, we can't cache.
return channel.read(dst, position, length);
return reader.read(dst, position, length);
}

boolean fallWithinCache = cachePosition >= 0 && cachePosition <= start && end <= cachePosition + cache.readableBytes();
Expand All @@ -69,7 +80,7 @@ public synchronized int read(ByteBuf dst, long position, int length) throws IOEx
cachePosition = start;
// Make sure the cache is not larger than the channel capacity.
int cacheLength = (int) Math.min(cache.writableBytes(), channel.capacity() - cachePosition);
channel.read(cache, cachePosition, cacheLength);
reader.read(cache, cachePosition, cacheLength);
}

// Now the cache is ready.
Expand Down Expand Up @@ -107,6 +118,10 @@ private ByteBuf getCache() {
return this.cache;
}

private interface Reader {
int read(ByteBuf dst, long position, int length) throws IOException;
}

@Override
public void open(CapacityReader reader) throws IOException {
this.channel.open(reader);
Expand All @@ -127,11 +142,22 @@ public void write(ByteBuf src, long position) throws IOException {
this.channel.write(src, position);
}

@Override
public void retryWrite(ByteBuf src, long position, long retryIntervalMillis,
long retryTimeoutMillis) throws IOException {
channel.retryWrite(src, position, retryIntervalMillis, retryTimeoutMillis);
}

@Override
public void flush() throws IOException {
this.channel.flush();
}

@Override
public void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
channel.retryFlush(retryIntervalMillis, retryTimeoutMillis);
}

@Override
public boolean useDirectIO() {
return channel.useDirectIO();
Expand Down
101 changes: 7 additions & 94 deletions s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import com.automq.stream.s3.wal.exception.WALCapacityMismatchException;
import com.automq.stream.s3.wal.exception.WALNotInitializedException;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.IOException;
Expand All @@ -31,8 +30,6 @@
*/
public interface WALChannel {

Logger LOGGER = LoggerFactory.getLogger(WALChannel.class);

long DEFAULT_RETRY_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100);
long DEFAULT_RETRY_TIMEOUT = TimeUnit.MINUTES.toMillis(1);

Expand Down Expand Up @@ -77,24 +74,7 @@ default void retryWrite(ByteBuf src, long position) throws IOException {
/**
* Retry {@link #write(ByteBuf, long)} with the given interval until success or timeout.
*/
default void retryWrite(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
write(src, position);
break;
} catch (IOException e) {
if (System.nanoTime() - start > retryTimeoutNanos) {
LOGGER.error("Failed to write, retry timeout", e);
throw e;
} else {
LOGGER.error("Failed to write, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
}
}
}
}
void retryWrite(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException;

/**
* Flush to disk.
Expand All @@ -108,57 +88,10 @@ default void retryFlush() throws IOException {
/**
* Retry {@link #flush()} with the given interval until success or timeout.
*/
default void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
flush();
break;
} catch (IOException e) {
if (System.nanoTime() - start > retryTimeoutNanos) {
LOGGER.error("Failed to flush, retry timeout", e);
throw e;
} else {
LOGGER.error("Failed to flush, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
}
}
}
}

/**
* Call {@link #write(ByteBuf, long)} and {@link #flush()}.
*/
default void writeAndFlush(ByteBuf src, long position) throws IOException {
write(src, position);
flush();
}

default void retryWriteAndFlush(ByteBuf src, long position) throws IOException {
retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
}
void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException;

/**
* Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success or timeout.
*/
default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
writeAndFlush(src, position);
break;
} catch (IOException e) {
if (System.nanoTime() - start > retryTimeoutNanos) {
LOGGER.error("Failed to write and flush, retry timeout", e);
throw e;
} else {
LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
}
}
}
default int read(ByteBuf dst, long position) throws IOException {
return read(dst, position, dst.writableBytes());
}

/**
Expand All @@ -171,34 +104,14 @@ default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMi
*/
int read(ByteBuf dst, long position, int length) throws IOException;

default int read(ByteBuf dst, long position) throws IOException {
return read(dst, position, dst.writableBytes());
}

default int retryRead(ByteBuf dst, long position) throws IOException {
return retryRead(dst, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
return retryRead(dst, position, dst.writableBytes(), DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
}

/**
* Retry {@link #read(ByteBuf, long)} with the given interval until success or timeout.
* Retry {@link #read(ByteBuf, long, int)} with the given interval until success or timeout.
*/
default int retryRead(ByteBuf dst, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
return read(dst, position);
} catch (IOException e) {
if (System.nanoTime() - start > retryTimeoutNanos) {
LOGGER.error("Failed to read, retry timeout", e);
throw e;
} else {
LOGGER.error("Failed to read, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
}
}
}
}
int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis, long retryTimeoutMillis) throws IOException;

boolean useDirectIO();

Expand Down
Loading

0 comments on commit 34ba11e

Please sign in to comment.