Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3stream): use dedicated executor for token bucket in DataBlockReader #635

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.utils.LogContext;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import io.github.bucket4j.Bucket;
import io.netty.util.concurrent.DefaultThreadFactory;

Expand Down Expand Up @@ -65,7 +67,8 @@ public class CompactionManager {
private final StreamManager streamManager;
private final S3Operator s3Operator;
private final CompactionAnalyzer compactionAnalyzer;
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService compactScheduledExecutor;
private final ScheduledExecutorService bucketCallbackScheduledExecutor;
private final ExecutorService compactThreadPool;
private final ExecutorService forceSplitThreadPool;
private final CompactionUploader uploader;
Expand Down Expand Up @@ -100,7 +103,10 @@ public CompactionManager(Config config, ObjectManager objectManager, StreamManag
maxStreamObjectNumPerCommit = config.maxStreamObjectNumPerCommit();
this.compactionAnalyzer = new CompactionAnalyzer(compactionCacheSize, streamSplitSize, maxStreamNumPerStreamSetObject,
maxStreamObjectNumPerCommit, new LogContext(String.format("[CompactionAnalyzer id=%d] ", config.brokerId())));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("schedule-compact-executor"));
this.compactScheduledExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("schedule-compact-executor-%d", true), logger);
this.bucketCallbackScheduledExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", true), logger);
this.compactThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("object-compaction-manager"));
this.forceSplitThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("force-split-executor"));
this.logger.info("Compaction manager initialized with config: compactionInterval: {} min, compactionCacheSize: {} bytes, " +
Expand All @@ -114,7 +120,7 @@ public void start() {

private void scheduleNextCompaction(long delayMillis) {
logger.info("Next Compaction started in {} ms", delayMillis);
this.scheduledExecutorService.schedule(() -> {
this.compactScheduledExecutor.schedule(() -> {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
try {
logger.info("Compaction started");
Expand All @@ -134,7 +140,8 @@ private void scheduleNextCompaction(long delayMillis) {
}

public void shutdown() {
this.scheduledExecutorService.shutdown();
this.compactScheduledExecutor.shutdown();
this.bucketCallbackScheduledExecutor.shutdown();
this.uploader.stop();
}

Expand Down Expand Up @@ -264,7 +271,7 @@ private void logCompactionPlans(List<CompactionPlan> compactionPlans, Set<Long>
public CompletableFuture<Void> forceSplitAll() {
CompletableFuture<Void> cf = new CompletableFuture<>();
//TODO: deal with metadata delay
this.scheduledExecutorService.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> {
this.compactScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> {
List<Long> streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream())
.map(StreamOffsetRange::getStreamId).distinct().toList();
this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> {
Expand Down Expand Up @@ -341,7 +348,7 @@ private Collection<CompletableFuture<StreamObject>> splitStreamSetObject(List<St
objectManager.prepareObject(batchGroup.size(), TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES))
.thenComposeAsync(objectId -> {
List<StreamDataBlock> blocksToRead = batchGroup.stream().flatMap(p -> p.getLeft().stream()).toList();
DataBlockReader reader = new DataBlockReader(objectMetadata, s3Operator, compactionBucket);
DataBlockReader reader = new DataBlockReader(objectMetadata, s3Operator, compactionBucket, bucketCallbackScheduledExecutor);
// batch read
reader.readBlocks(blocksToRead, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkBandwidth));

Expand Down Expand Up @@ -528,7 +535,7 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List<Compactio
for (Map.Entry<Long, List<StreamDataBlock>> streamDataBlocEntry : compactionPlan.streamDataBlocksMap().entrySet()) {
S3ObjectMetadata metadata = s3ObjectMetadataMap.get(streamDataBlocEntry.getKey());
List<StreamDataBlock> streamDataBlocks = streamDataBlocEntry.getValue();
DataBlockReader reader = new DataBlockReader(metadata, s3Operator, compactionBucket);
DataBlockReader reader = new DataBlockReader(metadata, s3Operator, compactionBucket, bucketCallbackScheduledExecutor);
reader.readBlocks(streamDataBlocks, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkBandwidth));
}
List<CompletableFuture<StreamObject>> streamObjectCFList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.automq.stream.s3.compact.objects.StreamDataBlock;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import io.github.bucket4j.Bucket;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
Expand All @@ -33,6 +31,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -45,18 +44,22 @@ public class DataBlockReader {
private final S3Operator s3Operator;
private final CompletableFuture<List<StreamDataBlock>> indexBlockCf = new CompletableFuture<>();
private final Bucket throttleBucket;
private final ScheduledExecutorService bucketCbExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", false), LOGGER);
private final ScheduledExecutorService bucketCallbackExecutor;

public DataBlockReader(S3ObjectMetadata metadata, S3Operator s3Operator) {
this(metadata, s3Operator, null);
this(metadata, s3Operator, null, null);
}

public DataBlockReader(S3ObjectMetadata metadata, S3Operator s3Operator, Bucket throttleBucket) {
public DataBlockReader(S3ObjectMetadata metadata, S3Operator s3Operator, Bucket throttleBucket, ScheduledExecutorService bucketCallbackExecutor) {
this.metadata = metadata;
this.objectKey = metadata.key();
this.s3Operator = s3Operator;
this.throttleBucket = throttleBucket;
if (this.throttleBucket != null) {
this.bucketCallbackExecutor = Objects.requireNonNull(bucketCallbackExecutor);
} else {
this.bucketCallbackExecutor = null;
}
}

public CompletableFuture<List<StreamDataBlock>> getDataBlockIndex() {
Expand Down Expand Up @@ -186,7 +189,7 @@ private CompletableFuture<ByteBuf> rangeRead(long start, long end) {
if (throttleBucket == null) {
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2);
} else {
return throttleBucket.asScheduler().consume(end - start + 1, bucketCbExecutor)
return throttleBucket.asScheduler().consume(end - start + 1, bucketCallbackExecutor)
.thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2));
}
}
Expand Down