From 1176ad218175f04bd532d5f00828350a84823e0d Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Wed, 15 Nov 2023 10:58:44 +0800 Subject: [PATCH] fix(s3stream): use dedicated executor for token bucket in DataBlockReader Signed-off-by: Shichao Nie --- .../stream/s3/compact/CompactionManager.java | 21 ++++++++++++------- .../s3/compact/operator/DataBlockReader.java | 17 ++++++++------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 62d286d13..20b1410ce 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -34,6 +34,8 @@ import com.automq.stream.s3.operator.S3Operator; import com.automq.stream.s3.streams.StreamManager; import com.automq.stream.utils.LogContext; +import com.automq.stream.utils.ThreadUtils; +import com.automq.stream.utils.Threads; import io.github.bucket4j.Bucket; import io.netty.util.concurrent.DefaultThreadFactory; @@ -65,7 +67,8 @@ public class CompactionManager { private final StreamManager streamManager; private final S3Operator s3Operator; private final CompactionAnalyzer compactionAnalyzer; - private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledExecutorService compactScheduledExecutor; + private final ScheduledExecutorService bucketCallbackScheduledExecutor; private final ExecutorService compactThreadPool; private final ExecutorService forceSplitThreadPool; private final CompactionUploader uploader; @@ -100,7 +103,10 @@ public CompactionManager(Config config, ObjectManager objectManager, StreamManag maxStreamObjectNumPerCommit = config.maxStreamObjectNumPerCommit(); this.compactionAnalyzer = new CompactionAnalyzer(compactionCacheSize, streamSplitSize, maxStreamNumPerStreamSetObject, maxStreamObjectNumPerCommit, new LogContext(String.format("[CompactionAnalyzer id=%d] ", config.brokerId()))); - this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("schedule-compact-executor")); + this.compactScheduledExecutor = Threads.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("schedule-compact-executor-%d", true), logger); + this.bucketCallbackScheduledExecutor = Threads.newSingleThreadScheduledExecutor( + ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", true), logger); this.compactThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("object-compaction-manager")); this.forceSplitThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("force-split-executor")); this.logger.info("Compaction manager initialized with config: compactionInterval: {} min, compactionCacheSize: {} bytes, " + @@ -114,7 +120,7 @@ public void start() { private void scheduleNextCompaction(long delayMillis) { logger.info("Next Compaction started in {} ms", delayMillis); - this.scheduledExecutorService.schedule(() -> { + this.compactScheduledExecutor.schedule(() -> { TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS); try { logger.info("Compaction started"); @@ -134,7 +140,8 @@ private void scheduleNextCompaction(long delayMillis) { } public void shutdown() { - this.scheduledExecutorService.shutdown(); + this.compactScheduledExecutor.shutdown(); + this.bucketCallbackScheduledExecutor.shutdown(); this.uploader.stop(); } @@ -264,7 +271,7 @@ private void logCompactionPlans(List compactionPlans, Set public CompletableFuture forceSplitAll() { CompletableFuture cf = new CompletableFuture<>(); //TODO: deal with metadata delay - this.scheduledExecutorService.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> { + this.compactScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> { List streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream()) .map(StreamOffsetRange::getStreamId).distinct().toList(); this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> { @@ -341,7 +348,7 @@ private Collection> splitStreamSetObject(List { List blocksToRead = batchGroup.stream().flatMap(p -> p.getLeft().stream()).toList(); - DataBlockReader reader = new DataBlockReader(objectMetadata, s3Operator, compactionBucket); + DataBlockReader reader = new DataBlockReader(objectMetadata, s3Operator, compactionBucket, bucketCallbackScheduledExecutor); // batch read reader.readBlocks(blocksToRead, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkBandwidth)); @@ -528,7 +535,7 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List> streamDataBlocEntry : compactionPlan.streamDataBlocksMap().entrySet()) { S3ObjectMetadata metadata = s3ObjectMetadataMap.get(streamDataBlocEntry.getKey()); List streamDataBlocks = streamDataBlocEntry.getValue(); - DataBlockReader reader = new DataBlockReader(metadata, s3Operator, compactionBucket); + DataBlockReader reader = new DataBlockReader(metadata, s3Operator, compactionBucket, bucketCallbackScheduledExecutor); reader.readBlocks(streamDataBlocks, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkBandwidth)); } List> streamObjectCFList = new ArrayList<>(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index 116207508..d336f8711 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -21,8 +21,6 @@ import com.automq.stream.s3.compact.objects.StreamDataBlock; import com.automq.stream.s3.network.ThrottleStrategy; import com.automq.stream.s3.operator.S3Operator; -import com.automq.stream.utils.ThreadUtils; -import com.automq.stream.utils.Threads; import io.github.bucket4j.Bucket; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; @@ -33,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; @@ -45,18 +44,22 @@ public class DataBlockReader { private final S3Operator s3Operator; private final CompletableFuture> indexBlockCf = new CompletableFuture<>(); private final Bucket throttleBucket; - private final ScheduledExecutorService bucketCbExecutor = Threads.newSingleThreadScheduledExecutor( - ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", false), LOGGER); + private final ScheduledExecutorService bucketCallbackExecutor; public DataBlockReader(S3ObjectMetadata metadata, S3Operator s3Operator) { - this(metadata, s3Operator, null); + this(metadata, s3Operator, null, null); } - public DataBlockReader(S3ObjectMetadata metadata, S3Operator s3Operator, Bucket throttleBucket) { + public DataBlockReader(S3ObjectMetadata metadata, S3Operator s3Operator, Bucket throttleBucket, ScheduledExecutorService bucketCallbackExecutor) { this.metadata = metadata; this.objectKey = metadata.key(); this.s3Operator = s3Operator; this.throttleBucket = throttleBucket; + if (this.throttleBucket != null) { + this.bucketCallbackExecutor = Objects.requireNonNull(bucketCallbackExecutor); + } else { + this.bucketCallbackExecutor = null; + } } public CompletableFuture> getDataBlockIndex() { @@ -186,7 +189,7 @@ private CompletableFuture rangeRead(long start, long end) { if (throttleBucket == null) { return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2); } else { - return throttleBucket.asScheduler().consume(end - start + 1, bucketCbExecutor) + return throttleBucket.asScheduler().consume(end - start + 1, bucketCallbackExecutor) .thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2)); } }