Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): handle out of order records during recovery #932

Merged
merged 6 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -136,12 +138,39 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator<WriteAheadLog.Re
return recoverContinuousRecords(it, openingStreams, LOGGER);
}

/**
* Recover continuous records in each stream from the WAL, and put them into the returned {@link LogCache.LogCacheBlock}.
* It will filter out
* <ul>
* <li>the records that are not in the opening streams</li>
* <li>the records that have been committed</li>
* <li>the records that are not continuous, which means, all records after the first discontinuous record</li>
* </ul>
*
* It throws {@link IllegalStateException} if the start offset of the first recovered record mismatches
* the end offset of any opening stream, which indicates data loss.
* <p>
* If there are out of order records (which should never happen or there is a BUG), it will try to re-order them.
* <p>
* For example, if we recover following records from the WAL in a stream:
* <pre> 1, 2, 3, 5, 4, 6, 10, 11</pre>
* and the {@link StreamMetadata#endOffset()} of this stream is 3. Then the returned {@link LogCache.LogCacheBlock}
* will contain records
* <pre> 3, 4, 5, 6</pre>
* Here,
* <ul>
* <li>The record 1 and 2 are discarded because they have been committed (less than 3, the end offset of the stream)</li>
* <li>The record 10 and 11 are discarded because they are not continuous (10 is not 7, the next offset of 6)</li>
* <li>The record 5 and 4 are reordered because they are out of order, and we handle this bug here</li>
* </ul>
*/
static LogCache.LogCacheBlock recoverContinuousRecords(Iterator<WriteAheadLog.RecoverResult> it,
List<StreamMetadata> openingStreams, Logger logger) {
Map<Long, Long> openingStreamEndOffsets = openingStreams.stream().collect(Collectors.toMap(StreamMetadata::streamId, StreamMetadata::endOffset));
LogCache.LogCacheBlock cacheBlock = new LogCache.LogCacheBlock(1024L * 1024 * 1024);
long logEndOffset = -1L;
Map<Long, Long> streamNextOffsets = new HashMap<>();
Map<Long, Queue<StreamRecordBatch>> streamDiscontinuousRecords = new HashMap<>();
while (it.hasNext()) {
WriteAheadLog.RecoverResult recoverResult = it.next();
logEndOffset = recoverResult.recordOffset();
Expand All @@ -159,15 +188,48 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator<WriteAheadLog.Re
recordBuf.release();
continue;
}

Long expectNextOffset = streamNextOffsets.get(streamId);
Queue<StreamRecordBatch> discontinuousRecords = streamDiscontinuousRecords.get(streamId);
if (expectNextOffset == null || expectNextOffset == streamRecordBatch.getBaseOffset()) {
// continuous record, put it into cache.
cacheBlock.put(streamRecordBatch);
streamNextOffsets.put(streamRecordBatch.getStreamId(), streamRecordBatch.getLastOffset());
expectNextOffset = streamRecordBatch.getLastOffset();
// check if there are some out of order records in the queue.
if (discontinuousRecords != null) {
while (!discontinuousRecords.isEmpty()) {
StreamRecordBatch peek = discontinuousRecords.peek();
if (peek.getBaseOffset() == expectNextOffset) {
// should never happen, log it.
logger.error("[BUG] recover an out of order record, streamId={}, expectNextOffset={}, record={}", streamId, expectNextOffset, peek);
cacheBlock.put(peek);
discontinuousRecords.poll();
expectNextOffset = peek.getLastOffset();
} else {
break;
}
}
}
// update next offset.
streamNextOffsets.put(streamRecordBatch.getStreamId(), expectNextOffset);
} else {
logger.error("unexpected WAL record, streamId={}, expectNextOffset={}, record={}", streamId, expectNextOffset, streamRecordBatch);
streamRecordBatch.release();
// unexpected record, put it into discontinuous records queue.
if (discontinuousRecords == null) {
discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(StreamRecordBatch::getBaseOffset));
streamDiscontinuousRecords.put(streamId, discontinuousRecords);
}
discontinuousRecords.add(streamRecordBatch);
}
}
// release all discontinuous records.
streamDiscontinuousRecords.values().forEach(queue -> {
if (queue.isEmpty()) {
return;
}
logger.info("drop discontinuous records, records={}", queue);
queue.forEach(StreamRecordBatch::release);
});

if (logEndOffset >= 0L) {
cacheBlock.confirmOffset(logEndOffset);
}
Expand Down
31 changes: 28 additions & 3 deletions s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,26 +223,51 @@ public void testRecoverContinuousRecords() {
);

List<StreamMetadata> openingStreams = List.of(new StreamMetadata(233L, 0L, 0L, 11L, StreamState.OPENED));
LogCache.LogCacheBlock cacheBlock = storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams);
LogCache.LogCacheBlock cacheBlock = S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams);
// ignore closed stream and noncontinuous records.
assertEquals(1, cacheBlock.records().size());
List<StreamRecordBatch> streamRecords = cacheBlock.records().get(233L);
assertEquals(2, streamRecords.size());
assertEquals(11L, streamRecords.get(0).getBaseOffset());
assertEquals(12L, streamRecords.get(1).getBaseOffset());

//
// simulate data loss
openingStreams = List.of(
new StreamMetadata(233L, 0L, 0L, 5L, StreamState.OPENED));
boolean exception = false;
try {
storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams);
S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams);
} catch (IllegalStateException e) {
exception = true;
}
Assertions.assertTrue(exception);
}

@Test
public void testRecoverOutOfOrderRecords() {
List<WriteAheadLog.RecoverResult> recoverResults = List.of(
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 9L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 10L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 13L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 11L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 12L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 14L))),
new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 20L)))
);

List<StreamMetadata> openingStreams = List.of(new StreamMetadata(42L, 0L, 0L, 10L, StreamState.OPENED));
LogCache.LogCacheBlock cacheBlock = S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams);
// ignore closed stream and noncontinuous records.
assertEquals(1, cacheBlock.records().size());
List<StreamRecordBatch> streamRecords = cacheBlock.records().get(42L);
assertEquals(5, streamRecords.size());
assertEquals(10L, streamRecords.get(0).getBaseOffset());
assertEquals(11L, streamRecords.get(1).getBaseOffset());
assertEquals(12L, streamRecords.get(2).getBaseOffset());
assertEquals(13L, streamRecords.get(3).getBaseOffset());
assertEquals(14L, streamRecords.get(4).getBaseOffset());
}

@Test
public void testWALOverCapacity() throws WriteAheadLog.OverCapacityException {
storage.append(newRecord(233L, 10L));
Expand Down
Loading