Skip to content

Commit

Permalink
perf(s3storage): speedup forceUpload during shutdown (#858)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Dec 27, 2023
1 parent 6d5475d commit 711f87c
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 13 deletions.
41 changes: 29 additions & 12 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.s3.wal.WriteAheadLog;
import com.automq.stream.utils.FutureTicker;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
Expand Down Expand Up @@ -90,13 +91,19 @@ public class S3Storage implements Storage {
private final WALConfirmOffsetCalculator confirmOffsetCalculator = new WALConfirmOffsetCalculator();
private final Queue<DeltaWALUploadTaskContext> walPrepareQueue = new LinkedList<>();
private final Queue<DeltaWALUploadTaskContext> walCommitQueue = new LinkedList<>();
private final List<CompletableFuture<Void>> inflightWALUploadTasks = new CopyOnWriteArrayList<>();
private final List<DeltaWALUploadTaskContext> inflightWALUploadTasks = new CopyOnWriteArrayList<>();

private final ScheduledExecutorService backgroundExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-storage-background", true), LOGGER);
private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPoolWithMonitor(
4, "s3-storage-upload-wal", true, LOGGER);

/**
* A ticker used for batching force upload WAL.
* @see #forceUpload
*/
private final FutureTicker forceUploadTicker = new FutureTicker(500, TimeUnit.MILLISECONDS, backgroundExecutor);

private final Queue<WalWriteRequest> backoffRecords = new LinkedBlockingQueue<>();
private final ScheduledFuture<?> drainBackoffTask;
private long lastLogTimestamp = 0L;
Expand Down Expand Up @@ -424,12 +431,16 @@ private void continuousCheck(List<StreamRecordBatch> records) {
public CompletableFuture<Void> forceUpload(long streamId) {
TimerUtil timer = new TimerUtil();
CompletableFuture<Void> cf = new CompletableFuture<>();
List<CompletableFuture<Void>> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks);
// await inflight stream set object upload tasks to group force upload tasks.
CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT_INFLIGHT);
uploadDeltaWAL(streamId);
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf);
// Wait for a while to group force upload tasks.
forceUploadTicker.tick().whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordStageLatency(timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.FORCE_UPLOAD_WAL_AWAIT);
uploadDeltaWAL(streamId, true);
// Wait for all tasks contains streamId complete.
List<CompletableFuture<Void>> tasksContainsStream = this.inflightWALUploadTasks.stream()
.filter(it -> it.cache.containsStream(streamId))
.map(it -> it.cf)
.toList();
FutureUtil.propagate(CompletableFuture.allOf(tasksContainsStream.toArray(new CompletableFuture[0])), cf);
if (LogCache.MATCH_ALL_STREAMS != streamId) {
callbackSequencer.tryFree(streamId);
}
Expand Down Expand Up @@ -475,17 +486,18 @@ private Lock getStreamCallbackLock(long streamId) {

@SuppressWarnings("UnusedReturnValue")
CompletableFuture<Void> uploadDeltaWAL() {
return uploadDeltaWAL(LogCache.MATCH_ALL_STREAMS);
return uploadDeltaWAL(LogCache.MATCH_ALL_STREAMS, false);
}

CompletableFuture<Void> uploadDeltaWAL(long streamId) {
CompletableFuture<Void> uploadDeltaWAL(long streamId, boolean force) {
synchronized (deltaWALCache) {
deltaWALCache.setConfirmOffset(confirmOffsetCalculator.get());
Optional<LogCache.LogCacheBlock> blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId);
if (blockOpt.isPresent()) {
LogCache.LogCacheBlock logCacheBlock = blockOpt.get();
DeltaWALUploadTaskContext context = new DeltaWALUploadTaskContext(logCacheBlock);
context.objectManager = this.objectManager;
context.force = force;
return uploadDeltaWAL(context);
} else {
return CompletableFuture.completedFuture(null);
Expand All @@ -507,11 +519,11 @@ CompletableFuture<Void> uploadDeltaWAL(DeltaWALUploadTaskContext context) {
context.timer = new TimerUtil();
CompletableFuture<Void> cf = new CompletableFuture<>();
context.cf = cf;
inflightWALUploadTasks.add(cf);
inflightWALUploadTasks.add(context);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL"));
cf.whenComplete((nil, ex) -> {
S3StreamMetricsManager.recordStageLatency(context.timer.elapsedAs(TimeUnit.NANOSECONDS), S3Stage.UPLOAD_WAL_COMPLETE);
inflightWALUploadTasks.remove(cf);
inflightWALUploadTasks.remove(context);
if (ex != null) {
LOGGER.error("upload delta WAL fail", ex);
}
Expand All @@ -524,7 +536,7 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) {
// calculate upload rate
long elapsed = System.currentTimeMillis() - context.cache.createdTimestamp();
double rate;
if (elapsed <= 100L) {
if (context.force || elapsed <= 100L) {
rate = Long.MAX_VALUE;
} else {
rate = context.cache.size() * 1000.0 / Math.min(5000L, elapsed);
Expand Down Expand Up @@ -804,6 +816,11 @@ public static class DeltaWALUploadTaskContext {
DeltaWALUploadTask task;
CompletableFuture<Void> cf;
ObjectManager objectManager;
/**
* Indicate whether to force upload the delta wal.
* If true, the delta wal will be uploaded without rate limit.
*/
boolean force;

public DeltaWALUploadTaskContext(LogCache.LogCacheBlock cache) {
this.cache = cache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@ public void free() {
public long createdTimestamp() {
return createdTimestamp;
}

public boolean containsStream(long streamId) {
if (MATCH_ALL_STREAMS == streamId) {
return true;
}
return map.containsKey(streamId);
}
}

static class StreamRange {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public enum S3Stage {
/* Append WAL stages end */

/* Force upload WAL start */
FORCE_UPLOAD_WAL_AWAIT_INFLIGHT(S3Operation.FORCE_UPLOAD_STORAGE_WAL, "await_inflight"),
FORCE_UPLOAD_WAL_AWAIT(S3Operation.FORCE_UPLOAD_STORAGE_WAL, "await"),
FORCE_UPLOAD_WAL_COMPLETE(S3Operation.FORCE_UPLOAD_STORAGE_WAL, "complete"),
/* Force upload WAL end */

Expand Down
49 changes: 49 additions & 0 deletions s3stream/src/main/java/com/automq/stream/utils/FutureTicker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.utils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
* A ticker base on {@link CompletableFuture}
* TODO more docs and tests
*/
public class FutureTicker {

private final Executor delayedExecutor;

private CompletableFuture<Void> currentTick = CompletableFuture.completedFuture(null);

public FutureTicker(long delay, TimeUnit unit, Executor executor) {
this.delayedExecutor = CompletableFuture.delayedExecutor(delay, unit, executor);
}

public CompletableFuture<Void> tick() {
return maybeNextTick();
}

private synchronized CompletableFuture<Void> maybeNextTick() {
if (currentTick.isDone()) {
// a future which will complete after delay
currentTick = CompletableFuture.runAsync(() -> { }, delayedExecutor);
}
return currentTick;
}
}

0 comments on commit 711f87c

Please sign in to comment.