Skip to content

Commit

Permalink
fix(s3strea/wal): handle IOException during flushing WAL header
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Aug 16, 2024
1 parent 860d253 commit 58db90a
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ public static BlockWALServiceBuilder recoveryBuilder(String path) {
return new BlockWALServiceBuilder(path).recoveryMode(true);
}

private void flushWALHeader(ShutdownType shutdownType) {
private void flushWALHeader(ShutdownType shutdownType) throws IOException {
walHeader.setShutdownType(shutdownType);
flushWALHeader();
}

private synchronized void flushWALHeader() {
private synchronized void flushWALHeader() throws IOException {
long position = writeHeaderRoundTimes.getAndIncrement() % WAL_HEADER_COUNT * WAL_HEADER_CAPACITY;
walHeader.setLastWriteTimestamp(System.nanoTime());
long trimOffset = walHeader.getTrimOffset();
Expand Down Expand Up @@ -363,7 +363,7 @@ private BlockWALHeader newWALHeader() {
return new BlockWALHeader(walChannel.capacity(), initialWindowSize);
}

private void walHeaderReady(BlockWALHeader header) {
private void walHeaderReady(BlockWALHeader header) throws IOException {
if (nodeId != NOOP_NODE_ID) {
header.setNodeId(nodeId);
header.setEpoch(epoch);
Expand Down Expand Up @@ -392,7 +392,11 @@ public void shutdownGracefully() {
boolean gracefulShutdown = Optional.ofNullable(slidingWindowService)
.map(s -> s.shutdown(1, TimeUnit.DAYS))
.orElse(true);
flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY);
try {
flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY);
} catch (IOException ignored) {
// shutdown anyway
}

walChannel.close();

Expand Down Expand Up @@ -506,7 +510,13 @@ private CompletableFuture<Void> trim(long offset, boolean internal) {
}

walHeader.updateTrimOffset(offset);
return CompletableFuture.runAsync(this::flushWALHeader, walHeaderFlusher);
return CompletableFuture.runAsync(() -> {
try {
flushWALHeader();
} catch (IOException e) {
throw new RuntimeIOException(e);
}
}, walHeaderFlusher);
}

private void checkStarted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private void writeBlockData(BlockBatch blocks) throws IOException {
StorageOperationStats.getInstance().appendWALWriteStats.record(TimerUtil.durationElapsedAs(start, TimeUnit.NANOSECONDS));
}

private void makeWriteOffsetMatchWindow(long newWindowEndOffset) {
private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException {
// align to block size
newWindowEndOffset = WALUtil.alignLargeByBlockSize(newWindowEndOffset);
long windowStartOffset = windowCoreData.getStartOffset();
Expand All @@ -400,7 +400,7 @@ private void makeWriteOffsetMatchWindow(long newWindowEndOffset) {
}

public interface WALHeaderFlusher {
void flush();
void flush() throws IOException;
}

public static class WindowCoreData {
Expand Down Expand Up @@ -442,7 +442,7 @@ public void updateWindowStartOffset(long offset) {
this.startOffset.accumulateAndGet(offset, Math::max);
}

public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) {
public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) throws IOException {
boolean scaleWindowHappened = false;
scaleOutLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,28 @@ default void writeAndFlush(ByteBuf src, long position) throws IOException {
flush();
}

default void retryWriteAndFlush(ByteBuf src, long position) {
retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL);
default void retryWriteAndFlush(ByteBuf src, long position) throws IOException {
retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
}

/**
* Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success.
* Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success or timeout.
*/
default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis) {
default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
writeAndFlush(src, position);
break;
} catch (IOException e) {
LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
if (System.nanoTime() - start > retryTimeoutNanos) {
LOGGER.error("Failed to write and flush, retry timeout", e);
throw e;
} else {
LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
}
}
}
}
Expand Down

0 comments on commit 58db90a

Please sign in to comment.