diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index a3da6bf38a..6c2b14f648 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -18,6 +18,7 @@ import com.automq.stream.s3.cache.S3BlockCache; import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; +import com.automq.stream.s3.failover.Failover; import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.TimerUtil; @@ -262,17 +263,25 @@ public void startup() { * Upload WAL to S3 and close opening streams. */ public void recover() throws Throwable { + this.deltaWAL.start(); recover0(this.deltaWAL, this.streamManager, this.objectManager, LOGGER); } + /** + * Be called by {@link Failover} to recover from crash. + * Note: {@link WriteAheadLog#start()} should be called before this method. + */ public void recover(WriteAheadLog deltaWAL, StreamManager streamManager, ObjectManager objectManager, Logger logger) throws Throwable { recover0(deltaWAL, streamManager, objectManager, logger); } + /** + * Recover WAL, upload WAL to S3 and close opening streams. + * Note: {@link WriteAheadLog#start()} should be called before this method. + */ void recover0(WriteAheadLog deltaWAL, StreamManager streamManager, ObjectManager objectManager, Logger logger) throws Throwable { - deltaWAL.start(); List streams = streamManager.getOpeningStreams().get(); LogCache.LogCacheBlock cacheBlock = recoverContinuousRecords(deltaWAL.recover(), streams, logger); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 0691f8eb9a..d13bfd58d7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -263,6 +263,10 @@ private void parseRecordBody(long recoverStartOffset, SlidingWindowService.Recor @Override public WriteAheadLog start() throws IOException { + if (started.get()) { + LOGGER.warn("block WAL service already started"); + return this; + } StopWatch stopWatch = StopWatch.createStarted(); walChannel.open(channel -> Optional.ofNullable(tryReadWALHeader(walChannel)) @@ -381,6 +385,7 @@ public void shutdownGracefully() { @Override public WALMetadata metadata() { + checkStarted(); return new WALMetadata(walHeader.getNodeId(), walHeader.getEpoch()); }