Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3stream/wal): fail all IO operations once the WAL is failed #1841

Merged
merged 1 commit into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ private synchronized void flushWALHeader() throws IOException {
walHeader.setLastWriteTimestamp(System.nanoTime());
long trimOffset = walHeader.getTrimOffset();
ByteBuf buf = walHeader.marshal();
this.walChannel.retryWriteAndFlush(buf, position);
this.walChannel.retryWrite(buf, position);
this.walChannel.retryFlush();
buf.release();
walHeader.updateFlushedTrimOffset(trimOffset);
LOGGER.debug("WAL header flushed, position: {}, header: {}", position, walHeader);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.wal.util;

import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWALChannel implements WALChannel {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWALChannel.class);

/**
* Flag to indicate if the WAL has failed.
* It will be set to true if an IO operation fails continuously, and it will never be reset.
* Any IO operation will fail immediately if this flag is true.
*/
private volatile boolean failed = false;

@Override
public void write(ByteBuf src, long position) throws IOException {
checkFailed();
doWrite(src, position);
}

@Override
public void retryWrite(ByteBuf src, long position, long retryIntervalMillis,
long retryTimeoutMillis) throws IOException {
checkFailed();
retry(() -> write(src, position), retryIntervalMillis, retryTimeoutMillis);
}

@Override
public void flush() throws IOException {
checkFailed();
doFlush();
}

@Override
public void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
checkFailed();
retry(this::flush, retryIntervalMillis, retryTimeoutMillis);
}

@Override
public int read(ByteBuf dst, long position, int length) throws IOException {
checkFailed();
return doRead(dst, position, length);
}

@Override
public int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis,
long retryTimeoutMillis) throws IOException {
checkFailed();
return retry(() -> read(dst, position, length), retryIntervalMillis, retryTimeoutMillis);
}

private void retry(IORunnable runnable, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
retry(IOSupplier.from(runnable), retryIntervalMillis, retryTimeoutMillis);
}

private <T> T retry(IOSupplier<T> supplier, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
return supplier.get();
} catch (IOException e) {
if (System.nanoTime() - start > retryTimeoutNanos) {
failed = true;
LOGGER.error("Failed to execute IO operation, retry timeout", e);
throw e;
}
checkFailed();
LOGGER.warn("Failed to execute IO operation, retrying in {}ms, error: {}", retryIntervalMillis, e.getMessage());
Threads.sleep(retryIntervalMillis);
}
}
}

private void checkFailed() throws IOException {
if (failed) {
IOException e = new IOException("Failed to execute IO operation, WAL failed");
LOGGER.error("Failed to execute IO operation, WAL failed", e);
throw e;
}
}

protected abstract void doWrite(ByteBuf src, long position) throws IOException;

protected abstract void doFlush() throws IOException;

protected abstract int doRead(ByteBuf dst, long position, int length) throws IOException;

private interface IOSupplier<T> {
T get() throws IOException;

static IOSupplier<Void> from(IORunnable runnable) {
return () -> {
runnable.run();
return null;
};
}
}

private interface IORunnable {
void run() throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET;
import static com.automq.stream.s3.wal.util.WALUtil.isBlockDevice;

public class WALBlockDeviceChannel implements WALChannel {
public class WALBlockDeviceChannel extends AbstractWALChannel {
private static final Logger LOGGER = LoggerFactory.getLogger(WALBlockDeviceChannel.class);
private static final String CHECK_DIRECT_IO_AVAILABLE_FORMAT = "%s.check_direct_io_available";
final String path;
Expand Down Expand Up @@ -237,7 +237,7 @@ private ByteBuffer getBuffer(int alignedSize) {
}

@Override
public void write(ByteBuf src, long position) throws IOException {
public void doWrite(ByteBuf src, long position) throws IOException {
if (unalignedWrite) {
// unaligned write, just used for testing
unalignedWrite(src, position);
Expand Down Expand Up @@ -295,11 +295,11 @@ private int write(ByteBuffer src, long position) throws IOException {
}

@Override
public void flush() {
public void doFlush() {
}

@Override
public int read(ByteBuf dst, long position, int length) throws IOException {
public int doRead(ByteBuf dst, long position, int length) throws IOException {
long start = position;
length = Math.min(length, dst.writableBytes());
long end = position + length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,25 @@ public static WALCachedChannel of(WALChannel channel, int cacheSize) {
return new WALCachedChannel(channel, cacheSize);
}

@Override
public int read(ByteBuf dst, long position, int length) throws IOException {
return read(channel::read, dst, position, length);
}

@Override
public int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis,
long retryTimeoutMillis) throws IOException {
Reader reader = (buf, pos, len) -> channel.retryRead(buf, pos, len, retryIntervalMillis, retryTimeoutMillis);
return read(reader, dst, position, length);
}

/**
* As we use a common cache for all threads, we need to synchronize the read.
*/
@Override
public synchronized int read(ByteBuf dst, long position, int length) throws IOException {
private synchronized int read(Reader reader, ByteBuf dst, long position, int length) throws IOException {
if (CAPACITY_NOT_SET == channel.capacity()) {
// If we don't know the capacity now, we can't cache.
return channel.read(dst, position, length);
return reader.read(dst, position, length);
}

long start = position;
Expand All @@ -60,7 +71,7 @@ public synchronized int read(ByteBuf dst, long position, int length) throws IOEx
ByteBuf cache = getCache();
if (length > cache.capacity()) {
// If the length is larger than the cache capacity, we can't cache.
return channel.read(dst, position, length);
return reader.read(dst, position, length);
}

boolean fallWithinCache = cachePosition >= 0 && cachePosition <= start && end <= cachePosition + cache.readableBytes();
Expand All @@ -69,7 +80,7 @@ public synchronized int read(ByteBuf dst, long position, int length) throws IOEx
cachePosition = start;
// Make sure the cache is not larger than the channel capacity.
int cacheLength = (int) Math.min(cache.writableBytes(), channel.capacity() - cachePosition);
channel.read(cache, cachePosition, cacheLength);
reader.read(cache, cachePosition, cacheLength);
}

// Now the cache is ready.
Expand Down Expand Up @@ -107,6 +118,10 @@ private ByteBuf getCache() {
return this.cache;
}

private interface Reader {
int read(ByteBuf dst, long position, int length) throws IOException;
}

@Override
public void open(CapacityReader reader) throws IOException {
this.channel.open(reader);
Expand All @@ -127,11 +142,22 @@ public void write(ByteBuf src, long position) throws IOException {
this.channel.write(src, position);
}

@Override
public void retryWrite(ByteBuf src, long position, long retryIntervalMillis,
long retryTimeoutMillis) throws IOException {
channel.retryWrite(src, position, retryIntervalMillis, retryTimeoutMillis);
}

@Override
public void flush() throws IOException {
this.channel.flush();
}

@Override
public void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
channel.retryFlush(retryIntervalMillis, retryTimeoutMillis);
}

@Override
public boolean useDirectIO() {
return channel.useDirectIO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import com.automq.stream.s3.wal.exception.WALCapacityMismatchException;
import com.automq.stream.s3.wal.exception.WALNotInitializedException;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.IOException;
Expand All @@ -31,8 +30,6 @@
*/
public interface WALChannel {

Logger LOGGER = LoggerFactory.getLogger(WALChannel.class);

long DEFAULT_RETRY_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100);
long DEFAULT_RETRY_TIMEOUT = TimeUnit.MINUTES.toMillis(1);

Expand Down Expand Up @@ -77,24 +74,7 @@ default void retryWrite(ByteBuf src, long position) throws IOException {
/**
* Retry {@link #write(ByteBuf, long)} with the given interval until success or timeout.
*/
default void retryWrite(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
write(src, position);
break;
} catch (IOException e) {
if (System.nanoTime() - start > retryTimeoutNanos) {
LOGGER.error("Failed to write, retry timeout", e);
throw e;
} else {
LOGGER.error("Failed to write, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
}
}
}
}
void retryWrite(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException;

/**
* Flush to disk.
Expand All @@ -108,57 +88,10 @@ default void retryFlush() throws IOException {
/**
* Retry {@link #flush()} with the given interval until success or timeout.
*/
default void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
flush();
break;
} catch (IOException e) {
if (System.nanoTime() - start > retryTimeoutNanos) {
LOGGER.error("Failed to flush, retry timeout", e);
throw e;
} else {
LOGGER.error("Failed to flush, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
}
}
}
}

/**
* Call {@link #write(ByteBuf, long)} and {@link #flush()}.
*/
default void writeAndFlush(ByteBuf src, long position) throws IOException {
write(src, position);
flush();
}

default void retryWriteAndFlush(ByteBuf src, long position) throws IOException {
retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
}
void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException;

/**
* Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success or timeout.
*/
default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
writeAndFlush(src, position);
break;
} catch (IOException e) {
if (System.nanoTime() - start > retryTimeoutNanos) {
LOGGER.error("Failed to write and flush, retry timeout", e);
throw e;
} else {
LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
}
}
}
default int read(ByteBuf dst, long position) throws IOException {
return read(dst, position, dst.writableBytes());
}

/**
Expand All @@ -171,34 +104,14 @@ default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMi
*/
int read(ByteBuf dst, long position, int length) throws IOException;

default int read(ByteBuf dst, long position) throws IOException {
return read(dst, position, dst.writableBytes());
}

default int retryRead(ByteBuf dst, long position) throws IOException {
return retryRead(dst, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
return retryRead(dst, position, dst.writableBytes(), DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
}

/**
* Retry {@link #read(ByteBuf, long)} with the given interval until success or timeout.
* Retry {@link #read(ByteBuf, long, int)} with the given interval until success or timeout.
*/
default int retryRead(ByteBuf dst, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
return read(dst, position);
} catch (IOException e) {
if (System.nanoTime() - start > retryTimeoutNanos) {
LOGGER.error("Failed to read, retry timeout", e);
throw e;
} else {
LOGGER.error("Failed to read, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
}
}
}
}
int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis, long retryTimeoutMillis) throws IOException;

boolean useDirectIO();

Expand Down
Loading
Loading