From 1dd8d0918d61a21814780fe7edfdf314fd69c8a7 Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Fri, 10 May 2024 20:50:57 +0800 Subject: [PATCH] feat(s3stream/wal): recover a little more records after reaching the window (#1234) Signed-off-by: Ning Yu --- .../automq/stream/s3/wal/BlockWALService.java | 41 ++++++++++++++++++- .../stream/s3/wal/BlockWALServiceTest.java | 31 ++++++++++---- 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 7aa7343adc..b613294049 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -823,6 +823,8 @@ protected class RecoverIterator implements Iterator { private long maybeFirstInvalidCycle = -1; private long maybeFirstInvalidOffset = -1; private RecoverResult next; + private boolean strictMode = false; + private long lastValidOffset = -1; public RecoverIterator(long nextRecoverOffset, long windowLength, long skipRecordAtOffset) { this.nextRecoverOffset = nextRecoverOffset; @@ -830,6 +832,13 @@ public RecoverIterator(long nextRecoverOffset, long windowLength, long skipRecor this.windowLength = windowLength; } + /** + * Only used for testing purpose. + */ + public void strictMode() { + this.strictMode = true; + } + @Override public boolean hasNext() { boolean hasNext = tryReadNextRecord(); @@ -861,22 +870,32 @@ private boolean tryReadNextRecord() { if (next != null) { return true; } - while (maybeFirstInvalidOffset == -1 || nextRecoverOffset < maybeFirstInvalidOffset + windowLength) { + while (shouldContinue()) { long cycle = WALUtil.calculateCycle(nextRecoverOffset, walHeader.getCapacity(), WAL_HEADER_TOTAL_CAPACITY); boolean skip = nextRecoverOffset == skipRecordAtOffset; try { ByteBuf nextRecordBody = readRecord(nextRecoverOffset, offset -> WALUtil.recordOffsetToPosition(offset, walHeader.getCapacity(), WAL_HEADER_TOTAL_CAPACITY)); + if (isOutOfWindow(nextRecoverOffset)) { + // should never happen, log it + LOGGER.error("[BUG] record offset out of window, offset: {}, firstInvalidOffset: {}, window: {}", + nextRecoverOffset, maybeFirstInvalidOffset, windowLength); + } RecoverResultImpl recoverResult = new RecoverResultImpl(nextRecordBody, nextRecoverOffset); + lastValidOffset = nextRecoverOffset; + nextRecoverOffset += RECORD_HEADER_SIZE + nextRecordBody.readableBytes(); + if (maybeFirstInvalidCycle != -1 && maybeFirstInvalidCycle != cycle) { // we meet a valid record in the next cycle, so the "invalid" record we met before is not really invalid maybeFirstInvalidOffset = -1; maybeFirstInvalidCycle = -1; } + if (skip) { nextRecordBody.release(); continue; } + next = recoverResult; return true; } catch (ReadRecordException e) { @@ -892,5 +911,25 @@ private boolean tryReadNextRecord() { } return false; } + + private boolean shouldContinue() { + if (!isOutOfWindow(nextRecoverOffset)) { + // within the window + return true; + } + if (strictMode) { + // not in the window, and in strict mode, so we should stop + return false; + } + // allow to try to recover a little more records (no more than 4MiB) + return nextRecoverOffset < lastValidOffset + Math.min(windowLength, 1 << 22); + } + + private boolean isOutOfWindow(long offset) { + if (maybeFirstInvalidOffset == -1) { + return false; + } + return offset >= maybeFirstInvalidOffset + windowLength; + } } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java index f4df06494f..d0a22b00d5 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java @@ -13,6 +13,7 @@ import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.TestUtils; +import com.automq.stream.s3.wal.BlockWALService.RecoverIterator; import com.automq.stream.s3.wal.benchmark.WriteBench; import com.automq.stream.s3.wal.util.WALBlockDeviceChannel; import com.automq.stream.s3.wal.util.WALChannel; @@ -292,8 +293,7 @@ private static void testRecoverAfterMergeWrite0(boolean shutdown, boolean overCa .build() .start(); try { - Iterator recover = wal.recover(); - assertNotNull(recover); + Iterator recover = recover(wal); List recovered = new ArrayList<>(recordCount); while (recover.hasNext()) { @@ -615,11 +615,23 @@ public static Stream testRecoverFromDisasterData() { ); } + /** + * Call {@link WriteAheadLog#recover()} and set to strict mode. + */ + private static Iterator recover(WriteAheadLog wal) { + Iterator iterator = wal.recover(); + assertNotNull(iterator); + if (iterator instanceof RecoverIterator) { + ((RecoverIterator) iterator).strictMode(); + } + return iterator; + } + /** * Call {@link WriteAheadLog#recover()} {@link WriteAheadLog#reset()} and drop all records. */ private static void recoverAndReset(WriteAheadLog wal) { - for (Iterator it = wal.recover(); it.hasNext(); ) { + for (Iterator it = recover(wal); it.hasNext(); ) { it.next().record().release(); } wal.reset().join(); @@ -788,8 +800,9 @@ private void testSingleThreadRecover0(boolean shutdown, boolean overCapacity, in .build() .start(); try { - Iterator recover = wal.recover(); + Iterator recover = recover(wal); assertNotNull(recover); + ((RecoverIterator) recover).strictMode(); List recovered = new ArrayList<>(recordCount); while (recover.hasNext()) { @@ -864,8 +877,9 @@ private void testAppendAfterRecover0(boolean directIO) throws IOException, OverC .start(); try { // Recover records - Iterator recover = wal.recover(); + Iterator recover = recover(wal); assertNotNull(recover); + ((RecoverIterator) recover).strictMode(); List recovered = new ArrayList<>(); while (recover.hasNext()) { @@ -986,8 +1000,9 @@ private void testRecoverFromDisaster0( .start(); try { // Recover records - Iterator recover = wal.recover(); + Iterator recover = recover(wal); assertNotNull(recover); + ((RecoverIterator) recover).strictMode(); List recovered = new ArrayList<>(); while (recover.hasNext()) { @@ -1037,7 +1052,7 @@ private void testRecoverAfterReset0(boolean directIO) throws IOException, OverCa .direct(directIO) .build() .start(); - Iterator recover = wal2.recover(); + Iterator recover = recover(wal2); assertNotNull(recover); List recovered1 = new ArrayList<>(recordCount); while (recover.hasNext()) { @@ -1056,7 +1071,7 @@ private void testRecoverAfterReset0(boolean directIO) throws IOException, OverCa .direct(directIO) .build() .start(); - recover = wal3.recover(); + recover = recover(wal3); assertNotNull(recover); List recovered2 = new ArrayList<>(recordCount); while (recover.hasNext()) {