Skip to content

Commit

Permalink
feat(s3stream): reduce stream object compaction write amplification (#…
Browse files Browse the repository at this point in the history
…1159)

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Apr 24, 2024
1 parent d66a536 commit e81a0bd
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void startStreamObjectsCompactions() {
scheduledCompactionTaskFuture = streamObjectCompactionScheduler.scheduleWithFixedDelay(() -> {
List<StreamWrapper> operationStreams = new ArrayList<>(openedStreams.values());
operationStreams.forEach(StreamWrapper::compact);
}, 1, 1, TimeUnit.MINUTES);
}, 5, 5, TimeUnit.MINUTES);
}

private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.automq.stream.api.Stream;
import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.objects.CompactStreamObjectRequest;
import com.automq.stream.s3.objects.ObjectManager;
Expand Down Expand Up @@ -136,14 +137,15 @@ void compact0(CompactionType compactionType) throws ExecutionException, Interrup
// TODO: find a better way to cleanup the single head object
continue;
}
TimerUtil start = new TimerUtil();
long objectId = objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)).get();
Optional<CompactStreamObjectRequest> requestOpt = new StreamObjectGroupCompactor(streamId, stream.streamEpoch(),
startOffset, objectGroup, objectId, dataBlockGroupSizeThreshold, s3Operator).compact();
if (requestOpt.isPresent()) {
request = requestOpt.get();
objectManager.compactStreamObject(request).get();
if (s3ObjectLogger.isTraceEnabled()) {
s3ObjectLogger.trace("{}", request);
s3ObjectLogger.trace("{} cost {}ms", request, start.elapsedAs(TimeUnit.MILLISECONDS));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public class Defaults {
public static final int S3_STREAM_SET_OBJECT_COMPACTION_MAX_OBJECT_NUM = 500;
public static final int S3_MAX_STREAM_NUM_PER_STREAM_SET_OBJECT = 100000;
public static final int S3_MAX_STREAM_OBJECT_NUM_PER_COMMIT = 10000;
public static final long S3_OBJECT_DELETE_RETENTION_MINUTES = 10; // 10min
public static final long S3_OBJECT_DELETE_RETENTION_MINUTES = 5; // 5min
public static final long S3_NETWORK_BASELINE_BANDWIDTH = 100 * 1024 * 1024; // 100MB/s
public static final int S3_REFILL_PERIOD_MS = 1000; // 1s
public static final int S3_METRICS_EXPORTER_REPORT_INTERVAL_MS = 30000; // 30s
Expand Down

0 comments on commit e81a0bd

Please sign in to comment.