diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 7ed907e50f..ac197ce0d6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -74,6 +74,7 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CompletedPart; @@ -408,7 +409,9 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) { if (null != tagging) { builder.tagging(tagging); } - PutObjectRequest request = builder.build(); + PutObjectRequest request = builder + .checksumAlgorithm(ChecksumAlgorithm.CRC32_C) + .build(); AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers()); writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize); @@ -554,7 +557,7 @@ void createMultipartUpload0(String path, CompletableFuture cf) { if (null != tagging) { builder.tagging(tagging); } - CreateMultipartUploadRequest request = builder.build(); + CreateMultipartUploadRequest request = builder.checksumAlgorithm(ChecksumAlgorithm.CRC32_C).build(); writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> { S3OperationStats.getInstance().createMultiPartUploadStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(createMultipartUploadResponse.uploadId()); @@ -598,8 +601,13 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p TimerUtil timerUtil = new TimerUtil(); int size = part.readableBytes(); AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers()); - UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId) - .partNumber(partNumber).build(); + UploadPartRequest request = UploadPartRequest.builder() + .bucket(bucket) + .key(path) + .uploadId(uploadId) + .partNumber(partNumber) + .checksumAlgorithm(ChecksumAlgorithm.CRC32_C) + .build(); CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body); uploadPartCf.thenAccept(uploadPartResponse -> { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size);