diff --git a/data-prepper-pipeline-parser/build.gradle b/data-prepper-pipeline-parser/build.gradle index c270ad0dcd..09c89eb15c 100644 --- a/data-prepper-pipeline-parser/build.gradle +++ b/data-prepper-pipeline-parser/build.gradle @@ -25,6 +25,7 @@ dependencies { exclude group: 'commons-logging', module: 'commons-logging' } implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1' + implementation 'software.amazon.awssdk:arns' testImplementation testLibs.bundles.junit testImplementation testLibs.bundles.mockito testImplementation testLibs.hamcrest diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java index a9e6b50f45..d41080436c 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluatorResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.arns.Arn; import javax.xml.transform.TransformerException; import java.io.IOException; @@ -367,6 +368,10 @@ public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){ return s3Prefix+"/"+envSourceCoordinationIdentifier; } + public String getAccountIdFromRole(final String roleArn) { + return Arn.fromString(roleArn).accountId().orElse(null); + } + /** * Invokes a method dynamically on a given object. * diff --git a/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml b/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml index bd9d0b6a89..6f47151d6d 100644 --- a/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml +++ b/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml @@ -27,6 +27,7 @@ path_prefix: "${getMetadata(\"s3_partition_key\")}" codec: event_json: + default_bucket_owner: "<>.source.documentdb.aws.sts_role_arn>>" - s3: routes: - stream_load @@ -46,7 +47,7 @@ path_prefix: "${getMetadata(\"s3_partition_key\")}" codec: event_json: - + default_bucket_owner: "<>.source.documentdb.aws.sts_role_arn>>" "<>-s3": workers: "<<$.<>.workers>>" delay: "<<$.<>.delay>>" diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java index e219ba2208..f61d53c4a0 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java @@ -168,6 +168,8 @@ void setUp() { when(expressionEvaluator.isValidFormatExpression(anyString())).thenReturn(true); when(s3SinkConfig.getDefaultBucket()).thenReturn(null); + when(s3SinkConfig.getBucketOwners()).thenReturn(null); + when(s3SinkConfig.getDefaultBucketOwner()).thenReturn(null); } private S3Sink createObjectUnderTest() { diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index a9ab424eee..739ac876df 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -62,6 +62,7 @@ import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory; import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -140,6 +141,9 @@ class S3SinkServiceIT { @Mock private ExpressionEvaluator expressionEvaluator; + @Mock + private BucketOwnerProvider bucketOwnerProvider; + private OutputCodec codec; private KeyGenerator keyGenerator; @@ -270,7 +274,7 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx private S3SinkService createObjectUnderTest() { OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList()); final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig); - s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3AsyncClient); + s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3AsyncClient, bucketOwnerProvider); return new S3SinkService(s3SinkConfig, codecContext, Duration.ofSeconds(5), pluginMetrics, s3GroupManager); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java index 1c71b9e6db..b4538acaf8 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java @@ -6,6 +6,7 @@ import org.apache.parquet.io.PositionOutputStream; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.async.AsyncRequestBody; @@ -59,6 +60,9 @@ public class S3OutputStream extends PositionOutputStream { private final byte[] buf; private final S3AsyncClient s3Client; + + private final BucketOwnerProvider bucketOwnerProvider; + /** * Collection of the etags for the parts that have been uploaded */ @@ -93,7 +97,8 @@ public class S3OutputStream extends PositionOutputStream { public S3OutputStream(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, - final String defaultBucket) { + final String defaultBucket, + final BucketOwnerProvider bucketOwnerProvider) { this.s3Client = s3Client; this.bucket = bucketSupplier.get(); this.key = keySupplier.get(); @@ -103,6 +108,7 @@ public S3OutputStream(final S3AsyncClient s3Client, open = true; this.defaultBucket = defaultBucket; this.executorService = Executors.newSingleThreadExecutor(); + this.bucketOwnerProvider = bucketOwnerProvider; } @Override @@ -191,6 +197,7 @@ public CompletableFuture close(final Consumer runOnCompletion, final .build(); CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder() .bucket(bucket) + .expectedBucketOwner(bucketOwnerProvider.getBucketOwner(defaultBucket).orElse(null)) .key(key) .uploadId(uploadId) .multipartUpload(completedMultipartUpload) @@ -250,6 +257,7 @@ private void uploadPart() { int partNumber = etags.size() + 1; UploadPartRequest uploadRequest = UploadPartRequest.builder() .bucket(bucket) + .expectedBucketOwner(bucketOwnerProvider.getBucketOwner(defaultBucket).orElse(null)) .key(key) .uploadId(uploadId) .partNumber(partNumber) @@ -278,6 +286,7 @@ private void createMultipartUpload() { CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder() .bucket(bucket) .key(key) + .expectedBucketOwner(bucketOwnerProvider.getBucketOwner(bucket).orElse(null)) .build(); CompletableFuture multipartUpload = s3Client.createMultipartUpload(uploadRequest); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index e1dd406eb1..4aa2898476 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -31,6 +31,8 @@ import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory; import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.ConfigBucketOwnerProviderFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -71,6 +73,8 @@ public S3Sink(final PluginSetting pluginSetting, this.sinkContext = sinkContext; final PluginModel codecConfiguration = s3SinkConfig.getCodec(); final CodecFactory codecFactory = new CodecFactory(pluginFactory, codecConfiguration); + final ConfigBucketOwnerProviderFactory configBucketOwnerProviderFactory = new ConfigBucketOwnerProviderFactory(); + final BucketOwnerProvider bucketOwnerProvider = configBucketOwnerProviderFactory.createBucketOwnerProvider(s3SinkConfig); final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); @@ -112,7 +116,7 @@ public S3Sink(final PluginSetting pluginSetting, testCodec.validateAgainstCodecContext(s3OutputCodecContext); final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig); - final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client); + final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client, bucketOwnerProvider); s3SinkService = new S3SinkService(s3SinkConfig, s3OutputCodecContext, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java index eb9372bcf5..eb12f0790b 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java @@ -10,6 +10,7 @@ import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.aws.validator.AwsAccountId; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; @@ -18,6 +19,8 @@ import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; +import java.util.Map; + /** * s3 sink configuration class contains properties, used to read yaml configuration. */ @@ -74,6 +77,13 @@ public class S3SinkConfig { @JsonProperty("max_retries") private int maxUploadRetries = DEFAULT_UPLOAD_RETRIES; + @JsonProperty("bucket_owners") + private Map bucketOwners; + + @JsonProperty("default_bucket_owner") + @AwsAccountId + private String defaultBucketOwner; + /** * Aws Authentication configuration Options. * @return aws authentication options. @@ -154,4 +164,12 @@ public CompressionOption getCompression() { } public String getDefaultBucket() { return defaultBucket; } + + public Map getBucketOwners() { + return bucketOwners; + } + + public String getDefaultBucketOwner() { + return defaultBucketOwner; + } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java index 7447182383..84ad85fdd8 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java @@ -5,10 +5,11 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.function.Supplier; public interface BufferFactory { - Buffer getBuffer(S3AsyncClient s3Client, Supplier bucketSupplier, Supplier keySupplier, String defaultBucket); + Buffer getBuffer(S3AsyncClient s3Client, Supplier bucketSupplier, Supplier keySupplier, String defaultBucket, BucketOwnerProvider bucketOwnerProvider); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java index 0f075906ae..9fd051b3d5 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.async.AsyncRequestBody; @@ -25,26 +26,35 @@ class BufferUtilities { static final String INVALID_BUCKET = "The specified bucket is not valid"; static CompletableFuture putObjectOrSendToDefaultBucket(final S3AsyncClient s3Client, - final AsyncRequestBody requestBody, - final Consumer runOnCompletion, - final Consumer runOnFailure, - final String objectKey, - final String targetBucket, - final String defaultBucket) { + final AsyncRequestBody requestBody, + final Consumer runOnCompletion, + final Consumer runOnFailure, + final String objectKey, + final String targetBucket, + final String defaultBucket, + final BucketOwnerProvider bucketOwnerProvider) { final boolean[] defaultBucketAttempted = new boolean[1]; return s3Client.putObject( - PutObjectRequest.builder().bucket(targetBucket).key(objectKey).build(), requestBody) + PutObjectRequest.builder() + .bucket(targetBucket) + .key(objectKey) + .expectedBucketOwner(bucketOwnerProvider.getBucketOwner(targetBucket).orElse(null)) + .build(), requestBody) .handle((result, ex) -> { if (ex != null) { runOnFailure.accept(ex); if (defaultBucket != null && - (ex instanceof NoSuchBucketException || ex.getMessage().contains(ACCESS_DENIED) || ex.getMessage().contains(INVALID_BUCKET))) { + (ex instanceof NoSuchBucketException || ex.getCause() instanceof NoSuchBucketException || ex.getMessage().contains(ACCESS_DENIED) || ex.getMessage().contains(INVALID_BUCKET))) { LOG.warn("Bucket {} could not be accessed, attempting to send to default_bucket {}", targetBucket, defaultBucket); defaultBucketAttempted[0] = true; return s3Client.putObject( - PutObjectRequest.builder().bucket(defaultBucket).key(objectKey).build(), + PutObjectRequest.builder() + .bucket(defaultBucket) + .key(objectKey) + .expectedBucketOwner(bucketOwnerProvider.getBucketOwner(defaultBucket).orElse(null)) + .build(), requestBody); } else { runOnCompletion.accept(false); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java index f27f8a2642..d263926849 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java @@ -1,6 +1,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; import org.opensearch.dataprepper.plugins.sink.s3.codec.BufferedCodec; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.function.Supplier; @@ -18,8 +19,9 @@ public CodecBufferFactory(BufferFactory innerBufferFactory, BufferedCodec codec) public Buffer getBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, - final String defaultBucket) { - Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); + final String defaultBucket, + final BucketOwnerProvider bucketOwnerProvider) { + Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); return new CodecBuffer(innerBuffer, bufferedCodec); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java index b0341f5bc8..f79cdd0779 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.Objects; @@ -17,7 +18,9 @@ public class CompressionBufferFactory implements BufferFactory { private final CompressionEngine compressionEngine; private final boolean compressionInternal; - public CompressionBufferFactory(final BufferFactory innerBufferFactory, final CompressionEngine compressionEngine, final OutputCodec codec) { + public CompressionBufferFactory(final BufferFactory innerBufferFactory, + final CompressionEngine compressionEngine, + final OutputCodec codec) { this.innerBufferFactory = Objects.requireNonNull(innerBufferFactory); this.compressionEngine = Objects.requireNonNull(compressionEngine); compressionInternal = Objects.requireNonNull(codec).isCompressionInternal(); @@ -27,8 +30,9 @@ public CompressionBufferFactory(final BufferFactory innerBufferFactory, final Co public Buffer getBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, - final String defaultBucket) { - final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); + final String defaultBucket, + final BucketOwnerProvider bucketOwnerProvider) { + final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); if(compressionInternal) return internalBuffer; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java index 122c6b7e0c..5334f42313 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; import org.apache.commons.lang3.time.StopWatch; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -28,6 +29,8 @@ public class InMemoryBuffer implements Buffer { private final S3AsyncClient s3Client; private final Supplier bucketSupplier; private final Supplier keySupplier; + + private final BucketOwnerProvider bucketOwnerProvider; private int eventCount; private final StopWatch watch; private boolean isCodecStarted; @@ -39,7 +42,8 @@ public class InMemoryBuffer implements Buffer { InMemoryBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, - final String defaultBucket) { + final String defaultBucket, + final BucketOwnerProvider bucketOwnerProvider) { this.s3Client = s3Client; this.bucketSupplier = bucketSupplier; this.keySupplier = keySupplier; @@ -49,6 +53,7 @@ public class InMemoryBuffer implements Buffer { watch.start(); isCodecStarted = false; this.defaultBucket = defaultBucket; + this.bucketOwnerProvider = bucketOwnerProvider; } @Override @@ -73,7 +78,7 @@ public Optional> flushToS3(final Consumer consumeO final byte[] byteArray = byteArrayOutputStream.toByteArray(); return Optional.ofNullable(BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, AsyncRequestBody.fromBytes(byteArray), consumeOnCompletion, consumeOnException, - getKey(), getBucket(), defaultBucket)); + getKey(), getBucket(), defaultBucket, bucketOwnerProvider)); } private String getBucket() { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java index 32d56fd7c9..8e9cb8c7d9 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.function.Supplier; @@ -14,7 +15,8 @@ public class InMemoryBufferFactory implements BufferFactory { public Buffer getBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, - final String defaultBucket) { - return new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); + final String defaultBucket, + final BucketOwnerProvider bucketOwnerProvider) { + return new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java index 550ee4702e..eec6c77996 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; import org.apache.commons.lang3.time.StopWatch; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.async.AsyncRequestBody; @@ -37,6 +38,8 @@ public class LocalFileBuffer implements Buffer { private final S3AsyncClient s3Client; private final Supplier bucketSupplier; private final Supplier keySupplier; + + private final BucketOwnerProvider bucketOwnerProvider; private int eventCount; private final StopWatch watch; private final File localFile; @@ -51,7 +54,8 @@ public class LocalFileBuffer implements Buffer { final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, - final String defaultBucket) throws FileNotFoundException { + final String defaultBucket, + final BucketOwnerProvider bucketOwnerProvider) throws FileNotFoundException { localFile = tempFile; outputStream = new BufferedOutputStream(new FileOutputStream(tempFile), 32 * 1024); this.s3Client = s3Client; @@ -62,6 +66,7 @@ public class LocalFileBuffer implements Buffer { watch.start(); isCodecStarted = false; this.defaultBucket = defaultBucket; + this.bucketOwnerProvider = bucketOwnerProvider; } @Override @@ -93,7 +98,7 @@ public Optional> flushToS3(final Consumer consumeO final CompletableFuture putObjectResponseCompletableFuture = BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, AsyncRequestBody.fromFile(localFile), consumeOnCompletion, consumeOnException, - getKey(), getBucket(), defaultBucket) + getKey(), getBucket(), defaultBucket, bucketOwnerProvider) .whenComplete(((response, throwable) -> removeTemporaryFile())); return Optional.of(putObjectResponseCompletableFuture); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java index da787a9794..b3eb3caf42 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -23,12 +24,13 @@ public class LocalFileBufferFactory implements BufferFactory { public Buffer getBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, - final String defaultBucket) { + final String defaultBucket, + final BucketOwnerProvider bucketOwnerProvider) { File tempFile = null; Buffer localfileBuffer = null; try { tempFile = File.createTempFile(PREFIX, SUFFIX); - localfileBuffer = new LocalFileBuffer(tempFile, s3Client, bucketSupplier, keySupplier, defaultBucket); + localfileBuffer = new LocalFileBuffer(tempFile, s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); } catch (IOException e) { LOG.error("Unable to create temp file ", e); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java index 321e294a38..55d8cec616 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; import org.opensearch.dataprepper.plugins.codec.parquet.S3OutputStream; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.function.Supplier; @@ -15,7 +16,8 @@ public class MultipartBufferFactory implements BufferFactory { public Buffer getBuffer(final S3AsyncClient s3Client, final Supplier bucketSupplier, final Supplier keySupplier, - final String defaultBucket) { - return new MultipartBuffer(new S3OutputStream(s3Client, bucketSupplier, keySupplier, defaultBucket)); + final String defaultBucket, + final BucketOwnerProvider bucketOwnerProvider) { + return new MultipartBuffer(new S3OutputStream(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider)); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java index 1b2f08ca9f..beae9ed157 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -33,6 +34,8 @@ public class S3GroupManager { private final S3AsyncClient s3Client; + private final BucketOwnerProvider bucketOwnerProvider; + private long totalGroupSize; @@ -40,13 +43,15 @@ public S3GroupManager(final S3SinkConfig s3SinkConfig, final S3GroupIdentifierFactory s3GroupIdentifierFactory, final BufferFactory bufferFactory, final CodecFactory codecFactory, - final S3AsyncClient s3Client) { + final S3AsyncClient s3Client, + final BucketOwnerProvider bucketOwnerProvider) { this.s3SinkConfig = s3SinkConfig; this.s3GroupIdentifierFactory = s3GroupIdentifierFactory; this.bufferFactory = bufferFactory; this.codecFactory = codecFactory; this.s3Client = s3Client; totalGroupSize = 0; + this.bucketOwnerProvider = bucketOwnerProvider; } public boolean hasNoGroups() { @@ -74,7 +79,7 @@ public S3Group getOrCreateGroupForEvent(final Event event) { if (allGroups.containsKey(s3GroupIdentifier)) { return allGroups.get(s3GroupIdentifier); } else { - final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3GroupIdentifier::getFullBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey, s3SinkConfig.getDefaultBucket()); + final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3GroupIdentifier::getFullBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey, s3SinkConfig.getDefaultBucket(), bucketOwnerProvider); final OutputCodec outputCodec = codecFactory.provideCodec(); final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup, outputCodec); allGroups.put(s3GroupIdentifier, s3Group); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/BucketOwnerProvider.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/BucketOwnerProvider.java new file mode 100644 index 0000000000..3a5f55c3ad --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/BucketOwnerProvider.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.ownership; + +import java.util.Optional; + +/** + * Gets the expected owner of an S3 bucket. + */ +@FunctionalInterface +public interface BucketOwnerProvider { + /** + * Gets the accountId of the owner bucket. Returns an empty optional + * if no account owner is known. + * @param bucket the name of the bucket + * @return The accountId or empty + */ + Optional getBucketOwner(final String bucket); +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/ConfigBucketOwnerProviderFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/ConfigBucketOwnerProviderFactory.java new file mode 100644 index 0000000000..439cea7ac8 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/ConfigBucketOwnerProviderFactory.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.ownership; + +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; +import software.amazon.awssdk.arns.Arn; + +/** + * Produces a {@link BucketOwnerProvider} from the S3 sink configuration as + * provided in a {@link S3SinkConfig}. + */ +public class ConfigBucketOwnerProviderFactory { + /** + * Creates the {@link BucketOwnerProvider} + * @param s3SinkConfig The input {@link S3SinkConfig} + * @return The {@link BucketOwnerProvider} + */ + public BucketOwnerProvider createBucketOwnerProvider(final S3SinkConfig s3SinkConfig) { + if (s3SinkConfig.getDefaultBucketOwner() == null && s3SinkConfig.getBucketOwners() == null) { + return new NoOwnershipBucketOwnerProvider(); + } + + final StaticBucketOwnerProvider staticBucketOwnerProvider = getStaticBucketOwnerProvider(s3SinkConfig); + + if(s3SinkConfig.getBucketOwners() != null && !s3SinkConfig.getBucketOwners().isEmpty()) { + return new MappedBucketOwnerProvider(s3SinkConfig.getBucketOwners(), staticBucketOwnerProvider); + } else { + return staticBucketOwnerProvider; + } + } + + private StaticBucketOwnerProvider getStaticBucketOwnerProvider(final S3SinkConfig s3SinkConfig) { + final String accountId; + + if(s3SinkConfig.getDefaultBucketOwner() != null) + accountId = s3SinkConfig.getDefaultBucketOwner(); + else if(s3SinkConfig.getAwsAuthenticationOptions() != null && s3SinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn() != null) + accountId = extractStsRoleArnAccountId(s3SinkConfig); + else + throw new InvalidPluginConfigurationException( + "The S3 sink is unable to determine a bucket owner. Configure the default_bucket_owner for the account Id that owns the bucket. You may also want to configure bucket_owners if you write to S3 buckets in different accounts."); + + return new StaticBucketOwnerProvider(accountId); + } + + private String extractStsRoleArnAccountId(final S3SinkConfig s3SinkConfig) { + final Arn roleArn = Arn.fromString(s3SinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn()); + + if (roleArn.accountId().isPresent()) { + return roleArn.accountId().get(); + } + + throw new RuntimeException(String.format("Unable to extract account id from sts_role_arn %s", s3SinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn())); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/MappedBucketOwnerProvider.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/MappedBucketOwnerProvider.java new file mode 100644 index 0000000000..5bd4e62b84 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/MappedBucketOwnerProvider.java @@ -0,0 +1,31 @@ +package org.opensearch.dataprepper.plugins.sink.s3.ownership; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * Implements {@link BucketOwnerProvider} using a mapping of bucket + * names to account Ids for the bucket owners. Uses a delegate + * {@link BucketOwnerProvider} as a fallback when the bucket is not + * found in the map. + */ +class MappedBucketOwnerProvider implements BucketOwnerProvider { + private final Map bucketOwnershipMap; + private final BucketOwnerProvider fallbackProvider; + + MappedBucketOwnerProvider(Map bucketOwnershipMap, BucketOwnerProvider fallbackProvider) { + this.bucketOwnershipMap = new HashMap<>(Objects.requireNonNull(bucketOwnershipMap)); + this.fallbackProvider = Objects.requireNonNull(fallbackProvider); + } + + @Override + public Optional getBucketOwner(String bucket) { + String account = bucketOwnershipMap.get(bucket); + if(account != null) { + return Optional.of(account); + } + return fallbackProvider.getBucketOwner(bucket); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/NoOwnershipBucketOwnerProvider.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/NoOwnershipBucketOwnerProvider.java new file mode 100644 index 0000000000..04586165ec --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/NoOwnershipBucketOwnerProvider.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.ownership; + +import java.util.Optional; + +/** + * An implementation of {@link BucketOwnerProvider} which does not provide + * a bucket owner, effectively skipping owner validation. + */ +class NoOwnershipBucketOwnerProvider implements BucketOwnerProvider { + @Override + public Optional getBucketOwner(final String bucket) { + return Optional.empty(); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/StaticBucketOwnerProvider.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/StaticBucketOwnerProvider.java new file mode 100644 index 0000000000..8ad1b6973d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/StaticBucketOwnerProvider.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.ownership; + +import java.util.Objects; +import java.util.Optional; + +/** + * An implementation of {@link BucketOwnerProvider} which provides the + * same owner for all buckets. + */ +class StaticBucketOwnerProvider implements BucketOwnerProvider { + private final String accountId; + + public StaticBucketOwnerProvider(final String accountId) { + this.accountId = Objects.requireNonNull(accountId); + } + + @Override + public Optional getBucketOwner(final String bucket) { + return Optional.of(accountId); + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStreamTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStreamTest.java index 3709a507fc..25953408e8 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStreamTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStreamTest.java @@ -11,6 +11,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; @@ -50,6 +51,9 @@ public class S3OutputStreamTest { @Mock private Consumer runOnError; + @Mock + private BucketOwnerProvider bucketOwnerProvider; + private String bucket; private String defaultBucket; @@ -64,7 +68,7 @@ void setup() { } private S3OutputStream createObjectUnderTest() { - return new S3OutputStream(s3Client, () -> bucket, () -> objectKey, defaultBucket); + return new S3OutputStream(s3Client, () -> bucket, () -> objectKey, defaultBucket, bucketOwnerProvider); } @Test diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java index 9b1a99046d..6a38ec3d21 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java @@ -99,6 +99,7 @@ void setUp() { when(objectKeyOptions.getNamePattern()).thenReturn(UUID.randomUUID().toString()); when(s3SinkConfig.getObjectKeyOptions()).thenReturn(objectKeyOptions); when(expressionEvaluator.isValidFormatExpression(anyString())).thenReturn(true); + when(s3SinkConfig.getDefaultBucketOwner()).thenReturn(UUID.randomUUID().toString()); } private S3Sink createObjectUnderTest() { diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java index f438631b4c..6107e75205 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java @@ -14,6 +14,7 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.NoSuchBucketException; @@ -22,6 +23,7 @@ import software.amazon.awssdk.services.s3.model.S3Exception; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -31,6 +33,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -60,6 +63,9 @@ public class BufferUtilitiesTest { @Mock private S3AsyncClient s3Client; + @Mock + private BucketOwnerProvider bucketOwnerProvider; + @BeforeEach void setup() { targetBucket = UUID.randomUUID().toString(); @@ -74,7 +80,7 @@ void putObjectOrSendToDefaultBucket_with_no_exception_sends_to_target_bucket() { when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenReturn(successfulFuture); - BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket).join(); + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket, bucketOwnerProvider).join(); final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); verify(s3Client, times(1)).putObject(argumentCaptor.capture(), eq(requestBody)); @@ -95,7 +101,7 @@ void putObjectOrSendToDefaultBucket_with_no_such_bucket_exception_and_null_defau final CompletableFuture failedFuture = CompletableFuture.failedFuture(NoSuchBucketException.builder().build()); when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenReturn(failedFuture); - BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, null).join(); + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, null, bucketOwnerProvider).join(); verify(s3Client, times(1)).putObject(any(PutObjectRequest.class), eq(requestBody)); verify(mockRunOnCompletion).accept(false); @@ -109,7 +115,7 @@ void putObjectOrSendToDefaultBucket_with_S3Exception_that_is_not_access_denied_o when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenReturn(failedFuture); BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, - defaultBucketEnabled ? defaultBucket : null); + defaultBucketEnabled ? defaultBucket : null, bucketOwnerProvider); verify(s3Client, times(1)).putObject(any(PutObjectRequest.class), eq(requestBody)); verify(mockRunOnCompletion).accept(false); @@ -123,7 +129,8 @@ void putObjectOrSendToDefaultBucket_with_NoSuchBucketException_or_access_denied_ final CompletableFuture failedFuture = CompletableFuture.failedFuture(exception); when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenReturn(failedFuture).thenReturn(successfulFuture); - BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket); + when(bucketOwnerProvider.getBucketOwner(anyString())).thenReturn(Optional.empty()); + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket, bucketOwnerProvider); final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); verify(s3Client, times(2)).putObject(argumentCaptor.capture(), eq(requestBody)); @@ -134,10 +141,12 @@ void putObjectOrSendToDefaultBucket_with_NoSuchBucketException_or_access_denied_ final PutObjectRequest putObjectRequest = putObjectRequestList.get(0); assertThat(putObjectRequest.bucket(), equalTo(targetBucket)); assertThat(putObjectRequest.key(), equalTo(objectKey)); + assertThat(putObjectRequest.expectedBucketOwner(), equalTo(null)); final PutObjectRequest defaultBucketPutObjectRequest = putObjectRequestList.get(1); assertThat(defaultBucketPutObjectRequest.bucket(), equalTo(defaultBucket)); assertThat(defaultBucketPutObjectRequest.key(), equalTo(objectKey)); + assertThat(defaultBucketPutObjectRequest.expectedBucketOwner(), equalTo(null)); final InOrder inOrder = Mockito.inOrder(mockRunOnCompletion, mockRunOnFailure); @@ -154,7 +163,12 @@ void putObject_failing_to_send_to_bucket_and_default_bucket_completes_as_expecte final CompletableFuture failedFuture = CompletableFuture.failedFuture(noSuchBucketException); when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenReturn(failedFuture).thenReturn(failedDefaultBucket); - BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket); + final String bucketOwner = UUID.randomUUID().toString(); + final String defaultBucketOwner = UUID.randomUUID().toString(); + when(bucketOwnerProvider.getBucketOwner(targetBucket)).thenReturn(Optional.of(bucketOwner)); + when(bucketOwnerProvider.getBucketOwner(defaultBucket)).thenReturn(Optional.of(defaultBucketOwner)); + + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, mockRunOnCompletion, mockRunOnFailure, objectKey, targetBucket, defaultBucket, bucketOwnerProvider); final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); verify(s3Client, times(2)).putObject(argumentCaptor.capture(), eq(requestBody)); @@ -165,10 +179,12 @@ void putObject_failing_to_send_to_bucket_and_default_bucket_completes_as_expecte final PutObjectRequest putObjectRequest = putObjectRequestList.get(0); assertThat(putObjectRequest.bucket(), equalTo(targetBucket)); assertThat(putObjectRequest.key(), equalTo(objectKey)); + assertThat(putObjectRequest.expectedBucketOwner(), equalTo(bucketOwner)); final PutObjectRequest defaultBucketPutObjectRequest = putObjectRequestList.get(1); assertThat(defaultBucketPutObjectRequest.bucket(), equalTo(defaultBucket)); assertThat(defaultBucketPutObjectRequest.key(), equalTo(objectKey)); + assertThat(defaultBucketPutObjectRequest.expectedBucketOwner(), equalTo(defaultBucketOwner)); final InOrder inOrder = Mockito.inOrder(mockRunOnCompletion, mockRunOnFailure); @@ -188,6 +204,7 @@ public Stream provideArguments(ExtensionContext extensionCo when(invalidBucketException.getMessage()).thenReturn(UUID.randomUUID() + INVALID_BUCKET + UUID.randomUUID()); final NoSuchBucketException noSuchBucketException = mock(NoSuchBucketException.class); + when(noSuchBucketException.getCause()).thenReturn(noSuchBucketException); return Stream.of( Arguments.arguments(noSuchBucketException), diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java index 0f27b69de5..d9c1384ae7 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java @@ -13,6 +13,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.UUID; @@ -45,6 +46,9 @@ class CompressionBufferFactoryTest { @Mock private Supplier keySupplier; + @Mock + private BucketOwnerProvider bucketOwnerProvider; + @Mock private OutputCodec codec; @@ -87,21 +91,21 @@ class WithBuffer { @BeforeEach void setUp() { - when(innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket)).thenReturn(innerBuffer); + when(innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider)).thenReturn(innerBuffer); } @Test void getBuffer_returns_CompressionBuffer() { - final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); + final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); assertThat(buffer, instanceOf(CompressionBuffer.class)); } @Test void getBuffer_returns_new_on_each_call() { final CompressionBufferFactory objectUnderTest = createObjectUnderTest(); - final Buffer firstBuffer = objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); + final Buffer firstBuffer = objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); - assertThat(objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket), not(equalTo(firstBuffer))); + assertThat(objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider), not(equalTo(firstBuffer))); } @Nested @@ -113,17 +117,17 @@ void setUp() { @Test void getBuffer_returns_innerBuffer_directly() { - final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); + final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); assertThat(buffer, sameInstance(innerBuffer)); } @Test void getBuffer_calls_on_each_call() { final CompressionBufferFactory objectUnderTest = createObjectUnderTest(); - objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); - objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); + objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); + objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); - verify(innerBufferFactory, times(2)).getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); + verify(innerBufferFactory, times(2)).getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); } } } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java index a16ad57c60..33cca9dd99 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java @@ -22,7 +22,7 @@ void test_inMemoryBufferFactory_notNull(){ void test_buffer_notNull(){ InMemoryBufferFactory inMemoryBufferFactory = new InMemoryBufferFactory(); Assertions.assertNotNull(inMemoryBufferFactory); - Buffer buffer = inMemoryBufferFactory.getBuffer(null, null, null, null); + Buffer buffer = inMemoryBufferFactory.getBuffer(null, null, null, null, null); Assertions.assertNotNull(buffer); assertThat(buffer, instanceOf(Buffer.class)); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java index 5bd0539152..ea42dafd27 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java @@ -13,6 +13,7 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectResponse; @@ -57,11 +58,14 @@ class InMemoryBufferTest { @Mock private Consumer mockRunOnFailure; + @Mock + private BucketOwnerProvider bucketOwnerProvider; + private InMemoryBuffer inMemoryBuffer; @Test void test_with_write_event_into_buffer() throws IOException { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { OutputStream outputStream = inMemoryBuffer.getOutputStream(); @@ -89,7 +93,7 @@ void test_with_write_event_into_buffer() throws IOException { */ void getDuration_provides_duration_within_expected_range() throws IOException, InterruptedException { Instant startTime = Instant.now(); - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); Instant endTime = Instant.now(); @@ -118,7 +122,7 @@ void test_flush_to_s3_success() { when(keySupplier.get()).thenReturn(key); when(bucketSupplier.get()).thenReturn(bucket); - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); Assertions.assertNotNull(inMemoryBuffer); final CompletableFuture expectedFuture = mock(CompletableFuture.class); @@ -126,7 +130,7 @@ void test_flush_to_s3_success() { try (final MockedStatic bufferUtilitiesMockedStatic = mockStatic(BufferUtilities.class)) { bufferUtilitiesMockedStatic.when(() -> BufferUtilities.putObjectOrSendToDefaultBucket(eq(s3Client), any(AsyncRequestBody.class), - eq(mockRunOnCompletion), eq(mockRunOnFailure), eq(key), eq(bucket), eq(null))) + eq(mockRunOnCompletion), eq(mockRunOnFailure), eq(key), eq(bucket), eq(null), eq(bucketOwnerProvider))) .thenReturn(expectedFuture); final Optional> result = inMemoryBuffer.flushToS3(mockRunOnCompletion, mockRunOnFailure); @@ -139,14 +143,14 @@ void test_flush_to_s3_success() { @Test void getOutputStream_is_PositionOutputStream() { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); assertThat(inMemoryBuffer.getOutputStream(), instanceOf(PositionOutputStream.class)); } @Test void getOutputStream_getPos_equals_written_size() throws IOException { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { OutputStream outputStream = inMemoryBuffer.getOutputStream(); @@ -163,7 +167,7 @@ void getOutputStream_getPos_equals_written_size() throws IOException { @Test void getSize_across_multiple_in_sequence() throws IOException { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { OutputStream outputStream = inMemoryBuffer.getOutputStream(); @@ -173,7 +177,7 @@ void getSize_across_multiple_in_sequence() throws IOException { } assertThat(inMemoryBuffer.getSize(), equalTo((long) MAX_EVENTS * 1000)); - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null, bucketOwnerProvider); assertThat(inMemoryBuffer.getSize(), equalTo(0L)); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java index 93ec98a94b..fb45e781aa 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java @@ -21,7 +21,7 @@ void test_localFileBufferFactory_notNull() { void test_buffer_notNull() { LocalFileBufferFactory localFileBufferFactory = new LocalFileBufferFactory(); Assertions.assertNotNull(localFileBufferFactory); - Buffer buffer = localFileBufferFactory.getBuffer(null, null, null, null); + Buffer buffer = localFileBufferFactory.getBuffer(null, null, null, null, null); Assertions.assertNotNull(buffer); assertThat(buffer, instanceOf(LocalFileBuffer.class)); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java index 9a839a85f5..fcca931caa 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java @@ -8,6 +8,7 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectResponse; @@ -56,6 +57,9 @@ class LocalFileBufferTest { @Mock private Consumer mockRunOnFailure; + @Mock + private BucketOwnerProvider bucketOwnerProvider; + private LocalFileBuffer localFileBuffer; private File tempFile; @@ -65,7 +69,7 @@ class LocalFileBufferTest { void setUp() throws IOException { defaultBucket = UUID.randomUUID().toString(); tempFile = File.createTempFile(PREFIX, SUFFIX); - localFileBuffer = new LocalFileBuffer(tempFile, s3Client, bucketSupplier, keySupplier, defaultBucket); + localFileBuffer = new LocalFileBuffer(tempFile, s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider); } @Test @@ -119,7 +123,7 @@ void test_with_write_events_into_buffer_and_flush_toS3() throws IOException { try (final MockedStatic bufferUtilitiesMockedStatic = mockStatic(BufferUtilities.class)) { bufferUtilitiesMockedStatic.when(() -> BufferUtilities.putObjectOrSendToDefaultBucket(eq(s3Client), any(AsyncRequestBody.class), - eq(mockRunOnCompletion), eq(mockRunOnFailure), eq(KEY), eq(BUCKET_NAME), eq(defaultBucket))) + eq(mockRunOnCompletion), eq(mockRunOnFailure), eq(KEY), eq(BUCKET_NAME), eq(defaultBucket), eq(bucketOwnerProvider))) .thenReturn(expectedFuture); final Optional> result = localFileBuffer.flushToS3(mockRunOnCompletion, mockRunOnFailure); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java index 4764c8431f..545b6feb77 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory; +import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.Collection; @@ -50,8 +51,11 @@ public class S3GroupManagerTest { @Mock private S3AsyncClient s3Client; + @Mock + private BucketOwnerProvider bucketOwnerProvider; + private S3GroupManager createObjectUnderTest() { - return new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client); + return new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client, bucketOwnerProvider); } @Test @@ -70,7 +74,7 @@ void getOrCreateGroupForEvent_creates_expected_group_when_it_does_not_exist() { when(s3SinkConfig.getDefaultBucket()).thenReturn(defaultBucket); final Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), eq(bucketOwnerProvider))) .thenAnswer(invocation -> { Supplier bucketSupplier = invocation.getArgument(1); Supplier objectKeySupplier = invocation.getArgument(2); @@ -112,7 +116,7 @@ void getOrCreateGroupForEvent_returns_expected_group_when_it_exists() { final Buffer buffer = mock(Buffer.class); final OutputCodec outputCodec = mock(OutputCodec.class); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), eq(bucketOwnerProvider))) .thenReturn(buffer); when(codecFactory.provideCodec()).thenReturn(outputCodec); @@ -133,7 +137,7 @@ void getOrCreateGroupForEvent_returns_expected_group_when_it_exists() { assertThat(secondResult.getS3GroupIdentifier(), equalTo(s3GroupIdentifier)); assertThat(secondResult.getBuffer(), equalTo(buffer)); - verify(bufferFactory, times(1)).getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket)); + verify(bufferFactory, times(1)).getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), eq(bucketOwnerProvider)); final Collection groups = objectUnderTest.getS3GroupEntries(); assertThat(groups, notNullValue()); @@ -173,7 +177,7 @@ void recalculateAndGetGroupSize_returns_expected_size() { final Buffer thirdBuffer = mock(Buffer.class); when(thirdBuffer.getSize()).thenReturn(bufferSizeBase * 3); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), eq(bucketOwnerProvider))) .thenReturn(buffer).thenReturn(secondBuffer).thenReturn(thirdBuffer); final OutputCodec outputCodec = mock(OutputCodec.class); @@ -219,7 +223,7 @@ void getGroupsOrderedBySize_returns_groups_in_expected_order() { final Buffer thirdBuffer = mock(Buffer.class); when(thirdBuffer.getSize()).thenReturn(bufferSizeBase * 3); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket), eq(bucketOwnerProvider))) .thenReturn(buffer).thenReturn(secondBuffer).thenReturn(thirdBuffer); final OutputCodec firstOutputCodec = mock(OutputCodec.class); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/ConfigBucketOwnerProviderFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/ConfigBucketOwnerProviderFactoryTest.java new file mode 100644 index 0000000000..bcfc00bded --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/ConfigBucketOwnerProviderFactoryTest.java @@ -0,0 +1,95 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.ownership; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; + +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ConfigBucketOwnerProviderFactoryTest { + + @Mock + private S3SinkConfig s3SinkConfig; + private String accountId; + + @BeforeEach + void setUp() { + accountId = RandomStringUtils.randomNumeric(12); + } + + private ConfigBucketOwnerProviderFactory createObjectUnderTest() { + return new ConfigBucketOwnerProviderFactory(); + } + + @Test + void createBucketOwnerProvider_returns_NoOwnershipBucketOwnerProvider_when_bucketOwners_not_provided() { + when(s3SinkConfig.getDefaultBucketOwner()).thenReturn(null); + when(s3SinkConfig.getBucketOwners()).thenReturn(null); + + final BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SinkConfig); + + assertThat(bucketOwnerProvider, instanceOf(NoOwnershipBucketOwnerProvider.class)); + } + + @Test + void createBucketOwnerProvider_returns_ownership_using_default_when_no_bucket_mapping() { + when(s3SinkConfig.getDefaultBucketOwner()).thenReturn(accountId); + + BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SinkConfig); + + assertThat(bucketOwnerProvider, notNullValue()); + + final String bucket = UUID.randomUUID().toString(); + final Optional optionalOwner = bucketOwnerProvider.getBucketOwner(bucket); + + assertThat(optionalOwner, notNullValue()); + assertThat(optionalOwner.isPresent(), equalTo(true)); + assertThat(optionalOwner.get(), equalTo(accountId)); + } + + @Test + void createBucketOwnerProvider_returns_ownership_using_default_when_bucket_mapping_does_not_match() { + when(s3SinkConfig.getBucketOwners()).thenReturn(Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString())); + when(s3SinkConfig.getDefaultBucketOwner()).thenReturn(accountId); + + BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SinkConfig); + + assertThat(bucketOwnerProvider, notNullValue()); + + final String bucket = UUID.randomUUID().toString(); + final Optional optionalOwner = bucketOwnerProvider.getBucketOwner(bucket); + + assertThat(optionalOwner, notNullValue()); + assertThat(optionalOwner.isPresent(), equalTo(true)); + assertThat(optionalOwner.get(), equalTo(accountId)); + } + + @Test + void createBucketOwnerProvider_throws_exception_when_ownership_cannot_be_determined() { + final ConfigBucketOwnerProviderFactory objectUnderTest = createObjectUnderTest(); + final InvalidPluginConfigurationException actualException = assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.createBucketOwnerProvider(s3SinkConfig)); + + assertThat(actualException.getMessage(), containsString("default_bucket_owner")); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/MappedBucketOwnerProviderTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/MappedBucketOwnerProviderTest.java new file mode 100644 index 0000000000..11a9e42474 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/MappedBucketOwnerProviderTest.java @@ -0,0 +1,120 @@ +package org.opensearch.dataprepper.plugins.sink.s3.ownership; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class MappedBucketOwnerProviderTest { + private Map bucketOwnershipMap; + @Mock + private BucketOwnerProvider fallbackProvider; + + private MappedBucketOwnerProvider createObjectUnderTest() { + return new MappedBucketOwnerProvider(bucketOwnershipMap, fallbackProvider); + } + + @Test + void constructor_throws_with_null_ownership_map() { + bucketOwnershipMap = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_with_null_fallback() { + bucketOwnershipMap = Collections.emptyMap(); + fallbackProvider = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Nested + class WithEmptyOwnersMap { + private String bucket; + @BeforeEach + void setUp() { + bucketOwnershipMap = Collections.emptyMap(); + bucket = UUID.randomUUID().toString(); + } + + @Test + void getBucketOwner_returns_owner_from_fallback_when_not_in_map() { + String fallbackAccount = UUID.randomUUID().toString(); + when(fallbackProvider.getBucketOwner(bucket)).thenReturn(Optional.of(fallbackAccount)); + + Optional optionalOwner = createObjectUnderTest().getBucketOwner(bucket); + assertThat(optionalOwner, notNullValue()); + assertThat(optionalOwner.isPresent(), equalTo(true)); + assertThat(optionalOwner.get(), equalTo(fallbackAccount)); + } + + @Test + void getBucketOwner_returns_empty_when_not_in_map_nor_in_fallback() { + when(fallbackProvider.getBucketOwner(bucket)).thenReturn(Optional.empty()); + + Optional optionalOwner = createObjectUnderTest().getBucketOwner(bucket); + assertThat(optionalOwner, notNullValue()); + assertThat(optionalOwner.isPresent(), equalTo(false)); + } + } + + @Nested + class WithOwnersMap { + private String knownBucket; + private String knownBucketAccount; + + @BeforeEach + void setUp() { + knownBucket = UUID.randomUUID().toString(); + knownBucketAccount = UUID.randomUUID().toString(); + bucketOwnershipMap = Map.of( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), + UUID.randomUUID().toString(), UUID.randomUUID().toString(), + knownBucket, knownBucketAccount, + UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + + @Test + void getBucketOwner_returns_owner_from_map_when_found() { + Optional optionalOwner = createObjectUnderTest().getBucketOwner(knownBucket); + assertThat(optionalOwner, notNullValue()); + assertThat(optionalOwner.isPresent(), equalTo(true)); + assertThat(optionalOwner.get(), equalTo(knownBucketAccount)); + } + + @Test + void getBucketOwner_returns_owner_from_fallback_when_not_in_map() { + String otherBucket = UUID.randomUUID().toString(); + String fallbackAccount = UUID.randomUUID().toString(); + when(fallbackProvider.getBucketOwner(otherBucket)).thenReturn(Optional.of(fallbackAccount)); + + Optional optionalOwner = createObjectUnderTest().getBucketOwner(otherBucket); + assertThat(optionalOwner, notNullValue()); + assertThat(optionalOwner.isPresent(), equalTo(true)); + assertThat(optionalOwner.get(), equalTo(fallbackAccount)); + } + + @Test + void getBucketOwner_returns_empty_when_not_in_map_nor_in_fallback() { + String otherBucket = UUID.randomUUID().toString(); + when(fallbackProvider.getBucketOwner(otherBucket)).thenReturn(Optional.empty()); + + Optional optionalOwner = createObjectUnderTest().getBucketOwner(otherBucket); + assertThat(optionalOwner, notNullValue()); + assertThat(optionalOwner.isPresent(), equalTo(false)); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/NoOwnershipBucketOwnerProviderTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/NoOwnershipBucketOwnerProviderTest.java new file mode 100644 index 0000000000..dbc86f9a5b --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/NoOwnershipBucketOwnerProviderTest.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.ownership; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class NoOwnershipBucketOwnerProviderTest { + private NoOwnershipBucketOwnerProvider createObjectUnderTest() { + return new NoOwnershipBucketOwnerProvider(); + } + + @Test + void getBucketOwner_returns_empty() { + final Optional optionalOwner = createObjectUnderTest().getBucketOwner(UUID.randomUUID().toString()); + + assertThat(optionalOwner, notNullValue()); + assertThat(optionalOwner.isPresent(), equalTo(false)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/StaticBucketOwnerProviderTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/StaticBucketOwnerProviderTest.java new file mode 100644 index 0000000000..5c92d36c4c --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ownership/StaticBucketOwnerProviderTest.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.ownership; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class StaticBucketOwnerProviderTest { + private String accountId; + + @BeforeEach + void setUp() { + accountId = UUID.randomUUID().toString(); + } + + private StaticBucketOwnerProvider createObjectUnderTest() { + return new StaticBucketOwnerProvider(accountId); + } + + @Test + void constructor_throws_with_null() { + accountId = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void getBucketOwner_returns_the_predefined_accountId() { + final Optional optionalOwner = createObjectUnderTest().getBucketOwner(UUID.randomUUID().toString()); + + assertThat(optionalOwner, notNullValue()); + assertThat(optionalOwner.isPresent(), equalTo(true)); + assertThat(optionalOwner.get(), equalTo(accountId)); + } +} \ No newline at end of file