diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index df7547598..9c2d544f5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -48,6 +49,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -136,12 +138,39 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator + *
  • the records that are not in the opening streams
  • + *
  • the records that have been committed
  • + *
  • the records that are not continuous, which means, all records after the first discontinuous record
  • + * + * + * It throws {@link IllegalStateException} if the start offset of the first recovered record mismatches + * the end offset of any opening stream, which indicates data loss. + *

    + * If there are out of order records (which should never happen or there is a BUG), it will try to re-order them. + *

    + * For example, if we recover following records from the WAL in a stream: + *

        1, 2, 3, 5, 4, 6, 10, 11
    + * and the {@link StreamMetadata#endOffset()} of this stream is 3. Then the returned {@link LogCache.LogCacheBlock} + * will contain records + *
        3, 4, 5, 6
    + * Here, + * + */ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator it, List openingStreams, Logger logger) { Map openingStreamEndOffsets = openingStreams.stream().collect(Collectors.toMap(StreamMetadata::streamId, StreamMetadata::endOffset)); LogCache.LogCacheBlock cacheBlock = new LogCache.LogCacheBlock(1024L * 1024 * 1024); long logEndOffset = -1L; Map streamNextOffsets = new HashMap<>(); + Map> streamDiscontinuousRecords = new HashMap<>(); while (it.hasNext()) { WriteAheadLog.RecoverResult recoverResult = it.next(); logEndOffset = recoverResult.recordOffset(); @@ -159,15 +188,48 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator discontinuousRecords = streamDiscontinuousRecords.get(streamId); if (expectNextOffset == null || expectNextOffset == streamRecordBatch.getBaseOffset()) { + // continuous record, put it into cache. cacheBlock.put(streamRecordBatch); - streamNextOffsets.put(streamRecordBatch.getStreamId(), streamRecordBatch.getLastOffset()); + expectNextOffset = streamRecordBatch.getLastOffset(); + // check if there are some out of order records in the queue. + if (discontinuousRecords != null) { + while (!discontinuousRecords.isEmpty()) { + StreamRecordBatch peek = discontinuousRecords.peek(); + if (peek.getBaseOffset() == expectNextOffset) { + // should never happen, log it. + logger.error("[BUG] recover an out of order record, streamId={}, expectNextOffset={}, record={}", streamId, expectNextOffset, peek); + cacheBlock.put(peek); + discontinuousRecords.poll(); + expectNextOffset = peek.getLastOffset(); + } else { + break; + } + } + } + // update next offset. + streamNextOffsets.put(streamRecordBatch.getStreamId(), expectNextOffset); } else { - logger.error("unexpected WAL record, streamId={}, expectNextOffset={}, record={}", streamId, expectNextOffset, streamRecordBatch); - streamRecordBatch.release(); + // unexpected record, put it into discontinuous records queue. + if (discontinuousRecords == null) { + discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(StreamRecordBatch::getBaseOffset)); + streamDiscontinuousRecords.put(streamId, discontinuousRecords); + } + discontinuousRecords.add(streamRecordBatch); } } + // release all discontinuous records. + streamDiscontinuousRecords.values().forEach(queue -> { + if (queue.isEmpty()) { + return; + } + logger.info("drop discontinuous records, records={}", queue); + queue.forEach(StreamRecordBatch::release); + }); + if (logEndOffset >= 0L) { cacheBlock.confirmOffset(logEndOffset); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java index ce6194014..3413b65c9 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java @@ -223,7 +223,7 @@ public void testRecoverContinuousRecords() { ); List openingStreams = List.of(new StreamMetadata(233L, 0L, 0L, 11L, StreamState.OPENED)); - LogCache.LogCacheBlock cacheBlock = storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams); + LogCache.LogCacheBlock cacheBlock = S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams); // ignore closed stream and noncontinuous records. assertEquals(1, cacheBlock.records().size()); List streamRecords = cacheBlock.records().get(233L); @@ -231,18 +231,43 @@ public void testRecoverContinuousRecords() { assertEquals(11L, streamRecords.get(0).getBaseOffset()); assertEquals(12L, streamRecords.get(1).getBaseOffset()); - // + // simulate data loss openingStreams = List.of( new StreamMetadata(233L, 0L, 0L, 5L, StreamState.OPENED)); boolean exception = false; try { - storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams); + S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams); } catch (IllegalStateException e) { exception = true; } Assertions.assertTrue(exception); } + @Test + public void testRecoverOutOfOrderRecords() { + List recoverResults = List.of( + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 9L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 10L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 13L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 11L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 12L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 14L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 20L))) + ); + + List openingStreams = List.of(new StreamMetadata(42L, 0L, 0L, 10L, StreamState.OPENED)); + LogCache.LogCacheBlock cacheBlock = S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams); + // ignore closed stream and noncontinuous records. + assertEquals(1, cacheBlock.records().size()); + List streamRecords = cacheBlock.records().get(42L); + assertEquals(5, streamRecords.size()); + assertEquals(10L, streamRecords.get(0).getBaseOffset()); + assertEquals(11L, streamRecords.get(1).getBaseOffset()); + assertEquals(12L, streamRecords.get(2).getBaseOffset()); + assertEquals(13L, streamRecords.get(3).getBaseOffset()); + assertEquals(14L, streamRecords.get(4).getBaseOffset()); + } + @Test public void testWALOverCapacity() throws WriteAheadLog.OverCapacityException { storage.append(newRecord(233L, 10L));