Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3stream): make executor static to prevent stack overflow #633

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@
//TODO: refactor to reduce duplicate code with ObjectWriter
public class DataBlockReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockReader.class);
private static final ScheduledExecutorService BUCKET_CALLBACK_EXECUTOR = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", true), LOGGER);
private final S3ObjectMetadata metadata;
private final String objectKey;
private final S3Operator s3Operator;
private final CompletableFuture<List<StreamDataBlock>> indexBlockCf = new CompletableFuture<>();
private final Bucket throttleBucket;
private final ScheduledExecutorService bucketCbExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", false), LOGGER);

public DataBlockReader(S3ObjectMetadata metadata, S3Operator s3Operator) {
this(metadata, s3Operator, null);
Expand Down Expand Up @@ -186,7 +186,7 @@ private CompletableFuture<ByteBuf> rangeRead(long start, long end) {
if (throttleBucket == null) {
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2);
} else {
return throttleBucket.asScheduler().consume(end - start + 1, bucketCbExecutor)
return throttleBucket.asScheduler().consume(end - start + 1, BUCKET_CALLBACK_EXECUTOR)
.thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2));
}
}
Expand Down