diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java index a334221a2525..99f1588f0c53 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java @@ -21,6 +21,8 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.time.Duration; import java.util.Collections; @@ -55,6 +57,8 @@ import software.amazon.awssdk.core.checksums.SdkChecksum; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.utils.IoUtils; @Value.Immutable public abstract class S3V4RestSignerClient @@ -286,6 +290,7 @@ public SdkHttpFullRequest sign( .uri(request.getUri()) .headers(request.headers()) .properties(requestPropertiesSupplier().get()) + .body(bodyAsString(request)) .build(); Key cacheKey = Key.from(remoteSigningRequest); @@ -328,6 +333,27 @@ public SdkHttpFullRequest sign( return mutableRequest.build(); } + /** + * Only add body for DeleteObjectsRequest. Refer to + * https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_RequestSyntax + */ + private String bodyAsString(SdkHttpFullRequest request) { + if (isDeleteObjectsRequest(request) && request.contentStreamProvider().isPresent()) { + try (InputStream is = request.contentStreamProvider().get().newStream()) { + return IoUtils.toUtf8String(is); + } catch (IOException e) { + LOG.debug("Failed to determine body for S3 sign request", e); + } + } + + return null; + } + + private boolean isDeleteObjectsRequest(SdkHttpFullRequest request) { + return request.method() == SdkHttpMethod.POST + && request.rawQueryParameters().containsKey("delete"); + } + private void reconstructHeaders( Map> signedAndUnsignedHeaders, SdkHttpFullRequest.Builder mutableRequest) { diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java index 6240efa2ad63..bc9fb44988e3 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java @@ -21,6 +21,7 @@ import static java.lang.String.format; import static org.apache.iceberg.rest.RESTCatalogAdapter.castRequest; import static org.apache.iceberg.rest.RESTCatalogAdapter.castResponse; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.InputStreamReader; @@ -32,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.servlet.http.HttpServlet; @@ -41,6 +43,7 @@ import org.apache.hc.core5.http.HttpHeaders; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.io.CharStreams; @@ -78,10 +81,42 @@ public class S3SignerServlet extends HttpServlet { ImmutableMap.of(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); private final ObjectMapper mapper; + private List s3SignRequestValidators = Lists.newArrayList(); + + /** + * SignRequestValidator is a wrapper class used for validating the contents of the S3SignRequest + * and thus verifying the behavior of the client during testing. + */ + public static class SignRequestValidator { + private final Predicate requestMatcher; + private final Predicate requestExpectation; + private final String assertMessage; + + public SignRequestValidator( + Predicate requestExpectation, + Predicate requestMatcher, + String assertMessage) { + this.requestExpectation = requestExpectation; + this.requestMatcher = requestMatcher; + this.assertMessage = assertMessage; + } + + void validateRequest(S3SignRequest request) { + if (requestMatcher.test(request)) { + assertThat(requestExpectation.test(request)).as(assertMessage).isTrue(); + } + } + } + public S3SignerServlet(ObjectMapper mapper) { this.mapper = mapper; } + public S3SignerServlet(ObjectMapper mapper, List s3SignRequestValidators) { + this.mapper = mapper; + this.s3SignRequestValidators = s3SignRequestValidators; + } + @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) { execute(request, response); @@ -188,6 +223,7 @@ protected void execute(HttpServletRequest request, HttpServletResponse response) S3SignRequest s3SignRequest = castRequest( S3SignRequest.class, mapper.readValue(request.getReader(), S3SignRequest.class)); + s3SignRequestValidators.forEach(validator -> validator.validateRequest(s3SignRequest)); S3SignResponse s3SignResponse = signRequest(s3SignRequest); if (CACHEABLE_METHODS.contains(SdkHttpMethod.fromValue(s3SignRequest.method()))) { // tell the client this can be cached diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java index 1e44e533188b..67a5d423b80b 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java @@ -20,12 +20,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.stream.Collectors; import org.apache.iceberg.aws.s3.MinioContainer; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.eclipse.jetty.server.Server; @@ -56,8 +58,11 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.UploadPartRequest; @@ -148,7 +153,15 @@ public void before() throws Exception { } private static Server initHttpServer() throws Exception { - S3SignerServlet servlet = new S3SignerServlet(S3ObjectMapper.mapper()); + S3SignerServlet.SignRequestValidator deleteObjectsWithBody = + new S3SignerServlet.SignRequestValidator( + (s3SignRequest) -> + "post".equalsIgnoreCase(s3SignRequest.method()) + && s3SignRequest.uri().getQuery().contains("delete"), + (s3SignRequest) -> s3SignRequest.body() != null && !s3SignRequest.body().isEmpty(), + "Sign request for delete objects should have a request body"); + S3SignerServlet servlet = + new S3SignerServlet(S3ObjectMapper.mapper(), ImmutableList.of(deleteObjectsWithBody)); ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); servletContext.setContextPath("/"); @@ -177,6 +190,22 @@ public void validatePutObject() { PutObjectRequest.builder().bucket(BUCKET).key("some/key").build(), Paths.get("/etc/hosts")); } + @Test + public void validateDeleteObjects() { + Path sourcePath = Paths.get("/etc/hosts"); + s3.putObject(PutObjectRequest.builder().bucket(BUCKET).key("some/key1").build(), sourcePath); + s3.putObject(PutObjectRequest.builder().bucket(BUCKET).key("some/key2").build(), sourcePath); + + Delete objectsToDelete = + Delete.builder() + .objects( + ObjectIdentifier.builder().key("some/key1").build(), + ObjectIdentifier.builder().key("some/key2").build()) + .build(); + + s3.deleteObjects(DeleteObjectsRequest.builder().bucket(BUCKET).delete(objectsToDelete).build()); + } + @Test public void validateListPrefix() { s3.listObjectsV2(ListObjectsV2Request.builder().bucket(BUCKET).prefix("some/prefix/").build());