From bd0b0293eb6ec7dce0116ae94cc605ed63e5e23d Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Fri, 25 Oct 2024 15:26:03 +0800 Subject: [PATCH] fix(s3stream): fix compaction exit on empty stream metadata list (#2095) Signed-off-by: Shichao Nie --- .../stream/s3/compact/CompactionManager.java | 8 -- .../s3/compact/CompactionManagerTest.java | 81 +++++++++++++++++++ 2 files changed, 81 insertions(+), 8 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 3f490f489f..4b5b77bee2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -228,10 +228,6 @@ public CompletableFuture compact() { return CompletableFuture.failedFuture(e); } return this.streamManager.getStreams(new ArrayList<>(streamIds)).thenAcceptAsync(streamMetadataList -> { - if (streamMetadataList.isEmpty()) { - logger.info("No stream metadata found for stream set objects"); - return; - } filterInvalidStreamDataBlocks(streamMetadataList); this.compact(streamMetadataList, objectMetadataList); }, compactThreadPool); @@ -422,10 +418,6 @@ public CompletableFuture forceSplitAll() { List streamIds = streamDataBlockMap.values().stream().flatMap(Collection::stream) .map(StreamDataBlock::getStreamId).distinct().collect(Collectors.toList()); this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> { - if (streamMetadataList.isEmpty()) { - logger.info("No stream metadata found for stream set objects"); - return; - } filterInvalidStreamDataBlocks(streamMetadataList); forceSplitObjects(streamMetadataList, objectMetadataList); cf.complete(null); diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java index d00562e53e..fe925d0b10 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java @@ -198,6 +198,72 @@ public void testForceSplitWithOutDatedObject() { Assertions.assertTrue(request.getStreamRanges().isEmpty()); } + + @Test + public void testForceSplitWithNonExistStream() { + List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); + streamMetadataList = streamMetadataList.stream().filter(s -> s.streamId() != STREAM_0).collect(Collectors.toList()); + List s3ObjectMetadata = this.objectManager.getServerObjects().join(); + when(config.streamSetObjectCompactionForceSplitPeriod()).thenReturn(0); + compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage); + + compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(0))); + compactionManager.filterInvalidStreamDataBlocks(streamMetadataList); + CommitStreamSetObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0)); + Assertions.assertEquals(-1, request.getObjectId()); + Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds()); + Assertions.assertEquals(2, request.getStreamObjects().size()); + Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(0)), request)); + + compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(1))); + compactionManager.filterInvalidStreamDataBlocks(streamMetadataList); + request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(1)); + Assertions.assertEquals(-1, request.getObjectId()); + Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds()); + Assertions.assertEquals(1, request.getStreamObjects().size()); + Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(1)), request)); + + compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(2))); + compactionManager.filterInvalidStreamDataBlocks(streamMetadataList); + request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(2)); + Assertions.assertEquals(-1, request.getObjectId()); + Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds()); + Assertions.assertEquals(2, request.getStreamObjects().size()); + Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(2)), request)); + } + + @Test + public void testForceSplitWithEmptyStreamList() { + List streamMetadataList = Collections.emptyList(); + List s3ObjectMetadata = this.objectManager.getServerObjects().join(); + when(config.streamSetObjectCompactionForceSplitPeriod()).thenReturn(0); + compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage); + + compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(0))); + compactionManager.filterInvalidStreamDataBlocks(streamMetadataList); + CommitStreamSetObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0)); + Assertions.assertEquals(-1, request.getObjectId()); + Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds()); + Assertions.assertTrue(request.getStreamObjects().isEmpty()); + Assertions.assertTrue(request.getStreamRanges().isEmpty()); + + compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(1))); + compactionManager.filterInvalidStreamDataBlocks(streamMetadataList); + request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(1)); + Assertions.assertEquals(-1, request.getObjectId()); + Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds()); + Assertions.assertTrue(request.getStreamObjects().isEmpty()); + Assertions.assertTrue(request.getStreamRanges().isEmpty()); + + compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(2))); + compactionManager.filterInvalidStreamDataBlocks(streamMetadataList); + request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(2)); + Assertions.assertEquals(-1, request.getObjectId()); + Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds()); + Assertions.assertTrue(request.getStreamObjects().isEmpty()); + Assertions.assertTrue(request.getStreamRanges().isEmpty()); + } + @Test public void testForceSplitWithException() { S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class); @@ -462,6 +528,21 @@ public void testCompactWithNonExistStream() { Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request)); } + @Test + public void testCompactWithEmptyStream() { + compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage); + List streamMetadataList = Collections.emptyList(); + compactionManager.updateStreamDataBlockMap(S3_WAL_OBJECT_METADATA_LIST); + compactionManager.filterInvalidStreamDataBlocks(streamMetadataList); + CommitStreamSetObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST); + + Assertions.assertEquals(-1, request.getObjectId()); + Assertions.assertTrue(request.getStreamObjects().isEmpty()); + Assertions.assertTrue(request.getStreamRanges().isEmpty()); + assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds()); + Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request)); + } + @Test public void testCompactNoneExistObjects() { when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(100L);