diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 6d7495cef..3d44f26a6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -33,6 +33,7 @@ import com.automq.stream.s3.operator.S3Operator; import com.automq.stream.s3.streams.StreamManager; import com.automq.stream.s3.wal.WriteAheadLog; +import com.automq.stream.utils.FutureTicker; import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.ThreadUtils; import com.automq.stream.utils.Threads; @@ -90,13 +91,19 @@ public class S3Storage implements Storage { private final WALConfirmOffsetCalculator confirmOffsetCalculator = new WALConfirmOffsetCalculator(); private final Queue walPrepareQueue = new LinkedList<>(); private final Queue walCommitQueue = new LinkedList<>(); - private final List> inflightWALUploadTasks = new CopyOnWriteArrayList<>(); + private final List inflightWALUploadTasks = new CopyOnWriteArrayList<>(); private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER); private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPoolWithMonitor( 4, "s3-storage-upload-wal", true, LOGGER); + /** + * A ticker used for batching force upload WAL. + * @see #forceUpload + */ + private final FutureTicker forceUploadTicker = new FutureTicker(500, TimeUnit.MILLISECONDS, backgroundExecutor); + private final Queue backoffRecords = new LinkedBlockingQueue<>(); private final ScheduledFuture drainBackoffTask; private long lastLogTimestamp = 0L; @@ -424,12 +431,16 @@ private void continuousCheck(List records) { public CompletableFuture forceUpload(long streamId) { TimerUtil timer = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>(); - List> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks); - // await inflight stream set object upload tasks to group force upload tasks. - CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> { - S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT_INFLIGHT); - uploadDeltaWAL(streamId); - FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf); + // Wait for a while to group force upload tasks. + forceUploadTicker.tick().whenComplete((nil, ex) -> { + S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT); + uploadDeltaWAL(streamId, true); + // Wait for all tasks contains streamId complete. + List> tasksContainsStream = this.inflightWALUploadTasks.stream() + .filter(it -> it.cache.containsStream(streamId)) + .map(it -> it.cf) + .toList(); + FutureUtil.propagate(CompletableFuture.allOf(tasksContainsStream.toArray(new CompletableFuture[0])), cf); if (LogCache.MATCH_ALL_STREAMS != streamId) { callbackSequencer.tryFree(streamId); } @@ -475,10 +486,10 @@ private Lock getStreamCallbackLock(long streamId) { @SuppressWarnings("UnusedReturnValue") CompletableFuture uploadDeltaWAL() { - return uploadDeltaWAL(LogCache.MATCH_ALL_STREAMS); + return uploadDeltaWAL(LogCache.MATCH_ALL_STREAMS, false); } - CompletableFuture uploadDeltaWAL(long streamId) { + CompletableFuture uploadDeltaWAL(long streamId, boolean force) { synchronized (deltaWALCache) { deltaWALCache.setConfirmOffset(confirmOffsetCalculator.get()); Optional blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId); @@ -486,6 +497,7 @@ CompletableFuture uploadDeltaWAL(long streamId) { LogCache.LogCacheBlock logCacheBlock = blockOpt.get(); DeltaWALUploadTaskContext context = new DeltaWALUploadTaskContext(logCacheBlock); context.objectManager = this.objectManager; + context.force = force; return uploadDeltaWAL(context); } else { return CompletableFuture.completedFuture(null); @@ -507,11 +519,11 @@ CompletableFuture uploadDeltaWAL(DeltaWALUploadTaskContext context) { context.timer = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>(); context.cf = cf; - inflightWALUploadTasks.add(cf); + inflightWALUploadTasks.add(context); backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL")); cf.whenComplete((nil, ex) -> { S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE); - inflightWALUploadTasks.remove(cf); + inflightWALUploadTasks.remove(context); if (ex != null) { LOGGER.error("upload delta WAL fail", ex); } @@ -524,7 +536,7 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) { // calculate upload rate long elapsed = System.currentTimeMillis() - context.cache.createdTimestamp(); double rate; - if (elapsed <= 100L) { + if (context.force || elapsed <= 100L) { rate = Long.MAX_VALUE; } else { rate = context.cache.size() * 1000.0 / Math.min(5000L, elapsed); @@ -804,6 +816,11 @@ public static class DeltaWALUploadTaskContext { DeltaWALUploadTask task; CompletableFuture cf; ObjectManager objectManager; + /** + * Indicate whether to force upload the delta wal. + * If true, the delta wal will be uploaded without rate limit. + */ + boolean force; public DeltaWALUploadTaskContext(LogCache.LogCacheBlock cache) { this.cache = cache; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index b7dfcc870..d2c643058 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -372,6 +372,13 @@ public void free() { public long createdTimestamp() { return createdTimestamp; } + + public boolean containsStream(long streamId) { + if (MATCH_ALL_STREAMS == streamId) { + return true; + } + return map.containsKey(streamId); + } } static class StreamRange { diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java index 65a62aef8..cfd2ce5ca 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/operations/S3Stage.java @@ -29,7 +29,7 @@ public enum S3Stage { /* Append WAL stages end */ /* Force upload WAL start */ - FORCE_UPLOAD_WAL_AWAIT_INFLIGHT(S3Operation.FORCE_UPLOAD_STORAGE_WAL, "await_inflight"), + FORCE_UPLOAD_WAL_AWAIT(S3Operation.FORCE_UPLOAD_STORAGE_WAL, "await"), FORCE_UPLOAD_WAL_COMPLETE(S3Operation.FORCE_UPLOAD_STORAGE_WAL, "complete"), /* Force upload WAL end */ diff --git a/s3stream/src/main/java/com/automq/stream/utils/FutureTicker.java b/s3stream/src/main/java/com/automq/stream/utils/FutureTicker.java new file mode 100644 index 000000000..666d5ce6b --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/utils/FutureTicker.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.utils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * A ticker base on {@link CompletableFuture} + * TODO more docs and tests + */ +public class FutureTicker { + + private final Executor delayedExecutor; + + private CompletableFuture currentTick = CompletableFuture.completedFuture(null); + + public FutureTicker(long delay, TimeUnit unit, Executor executor) { + this.delayedExecutor = CompletableFuture.delayedExecutor(delay, unit, executor); + } + + public CompletableFuture tick() { + return maybeNextTick(); + } + + private synchronized CompletableFuture maybeNextTick() { + if (currentTick.isDone()) { + // a future which will complete after delay + currentTick = CompletableFuture.runAsync(() -> { }, delayedExecutor); + } + return currentTick; + } +}