Skip to content

Commit

Permalink
feat(s3stream/wal): recover a little more records after reaching the …
Browse files Browse the repository at this point in the history
…window (#1234)

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored May 10, 2024
1 parent 7da5bef commit 1dd8d09
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -823,13 +823,22 @@ protected class RecoverIterator implements Iterator<RecoverResult> {
private long maybeFirstInvalidCycle = -1;
private long maybeFirstInvalidOffset = -1;
private RecoverResult next;
private boolean strictMode = false;
private long lastValidOffset = -1;

public RecoverIterator(long nextRecoverOffset, long windowLength, long skipRecordAtOffset) {
this.nextRecoverOffset = nextRecoverOffset;
this.skipRecordAtOffset = skipRecordAtOffset;
this.windowLength = windowLength;
}

/**
* Only used for testing purpose.
*/
public void strictMode() {
this.strictMode = true;
}

@Override
public boolean hasNext() {
boolean hasNext = tryReadNextRecord();
Expand Down Expand Up @@ -861,22 +870,32 @@ private boolean tryReadNextRecord() {
if (next != null) {
return true;
}
while (maybeFirstInvalidOffset == -1 || nextRecoverOffset < maybeFirstInvalidOffset + windowLength) {
while (shouldContinue()) {
long cycle = WALUtil.calculateCycle(nextRecoverOffset, walHeader.getCapacity(), WAL_HEADER_TOTAL_CAPACITY);
boolean skip = nextRecoverOffset == skipRecordAtOffset;
try {
ByteBuf nextRecordBody = readRecord(nextRecoverOffset, offset -> WALUtil.recordOffsetToPosition(offset, walHeader.getCapacity(), WAL_HEADER_TOTAL_CAPACITY));
if (isOutOfWindow(nextRecoverOffset)) {
// should never happen, log it
LOGGER.error("[BUG] record offset out of window, offset: {}, firstInvalidOffset: {}, window: {}",
nextRecoverOffset, maybeFirstInvalidOffset, windowLength);
}
RecoverResultImpl recoverResult = new RecoverResultImpl(nextRecordBody, nextRecoverOffset);
lastValidOffset = nextRecoverOffset;

nextRecoverOffset += RECORD_HEADER_SIZE + nextRecordBody.readableBytes();

if (maybeFirstInvalidCycle != -1 && maybeFirstInvalidCycle != cycle) {
// we meet a valid record in the next cycle, so the "invalid" record we met before is not really invalid
maybeFirstInvalidOffset = -1;
maybeFirstInvalidCycle = -1;
}

if (skip) {
nextRecordBody.release();
continue;
}

next = recoverResult;
return true;
} catch (ReadRecordException e) {
Expand All @@ -892,5 +911,25 @@ private boolean tryReadNextRecord() {
}
return false;
}

private boolean shouldContinue() {
if (!isOutOfWindow(nextRecoverOffset)) {
// within the window
return true;
}
if (strictMode) {
// not in the window, and in strict mode, so we should stop
return false;
}
// allow to try to recover a little more records (no more than 4MiB)
return nextRecoverOffset < lastValidOffset + Math.min(windowLength, 1 << 22);
}

private boolean isOutOfWindow(long offset) {
if (maybeFirstInvalidOffset == -1) {
return false;
}
return offset >= maybeFirstInvalidOffset + windowLength;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.TestUtils;
import com.automq.stream.s3.wal.BlockWALService.RecoverIterator;
import com.automq.stream.s3.wal.benchmark.WriteBench;
import com.automq.stream.s3.wal.util.WALBlockDeviceChannel;
import com.automq.stream.s3.wal.util.WALChannel;
Expand Down Expand Up @@ -292,8 +293,7 @@ private static void testRecoverAfterMergeWrite0(boolean shutdown, boolean overCa
.build()
.start();
try {
Iterator<RecoverResult> recover = wal.recover();
assertNotNull(recover);
Iterator<RecoverResult> recover = recover(wal);

List<Long> recovered = new ArrayList<>(recordCount);
while (recover.hasNext()) {
Expand Down Expand Up @@ -615,11 +615,23 @@ public static Stream<Arguments> testRecoverFromDisasterData() {
);
}

/**
* Call {@link WriteAheadLog#recover()} and set to strict mode.
*/
private static Iterator<RecoverResult> recover(WriteAheadLog wal) {
Iterator<RecoverResult> iterator = wal.recover();
assertNotNull(iterator);
if (iterator instanceof RecoverIterator) {
((RecoverIterator) iterator).strictMode();
}
return iterator;
}

/**
* Call {@link WriteAheadLog#recover()} {@link WriteAheadLog#reset()} and drop all records.
*/
private static void recoverAndReset(WriteAheadLog wal) {
for (Iterator<RecoverResult> it = wal.recover(); it.hasNext(); ) {
for (Iterator<RecoverResult> it = recover(wal); it.hasNext(); ) {
it.next().record().release();
}
wal.reset().join();
Expand Down Expand Up @@ -788,8 +800,9 @@ private void testSingleThreadRecover0(boolean shutdown, boolean overCapacity, in
.build()
.start();
try {
Iterator<RecoverResult> recover = wal.recover();
Iterator<RecoverResult> recover = recover(wal);
assertNotNull(recover);
((RecoverIterator) recover).strictMode();

List<Long> recovered = new ArrayList<>(recordCount);
while (recover.hasNext()) {
Expand Down Expand Up @@ -864,8 +877,9 @@ private void testAppendAfterRecover0(boolean directIO) throws IOException, OverC
.start();
try {
// Recover records
Iterator<RecoverResult> recover = wal.recover();
Iterator<RecoverResult> recover = recover(wal);
assertNotNull(recover);
((RecoverIterator) recover).strictMode();

List<Long> recovered = new ArrayList<>();
while (recover.hasNext()) {
Expand Down Expand Up @@ -986,8 +1000,9 @@ private void testRecoverFromDisaster0(
.start();
try {
// Recover records
Iterator<RecoverResult> recover = wal.recover();
Iterator<RecoverResult> recover = recover(wal);
assertNotNull(recover);
((RecoverIterator) recover).strictMode();

List<Long> recovered = new ArrayList<>();
while (recover.hasNext()) {
Expand Down Expand Up @@ -1037,7 +1052,7 @@ private void testRecoverAfterReset0(boolean directIO) throws IOException, OverCa
.direct(directIO)
.build()
.start();
Iterator<RecoverResult> recover = wal2.recover();
Iterator<RecoverResult> recover = recover(wal2);
assertNotNull(recover);
List<Long> recovered1 = new ArrayList<>(recordCount);
while (recover.hasNext()) {
Expand All @@ -1056,7 +1071,7 @@ private void testRecoverAfterReset0(boolean directIO) throws IOException, OverCa
.direct(directIO)
.build()
.start();
recover = wal3.recover();
recover = recover(wal3);
assertNotNull(recover);
List<Long> recovered2 = new ArrayList<>(recordCount);
while (recover.hasNext()) {
Expand Down

0 comments on commit 1dd8d09

Please sign in to comment.