Skip to content

Commit

Permalink
Add bucket owner validation support to s3 sink (opensearch-project#4504)
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored May 8, 2024
1 parent f23972c commit f5b0fee
Show file tree
Hide file tree
Showing 36 changed files with 629 additions and 63 deletions.
1 change: 1 addition & 0 deletions data-prepper-pipeline-parser/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
implementation 'software.amazon.awssdk:arns'
testImplementation testLibs.bundles.junit
testImplementation testLibs.bundles.mockito
testImplementation testLibs.hamcrest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluatorResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.arns.Arn;

import javax.xml.transform.TransformerException;
import java.io.IOException;
Expand Down Expand Up @@ -367,6 +368,10 @@ public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
return s3Prefix+"/"+envSourceCoordinationIdentifier;
}

public String getAccountIdFromRole(final String roleArn) {
return Arn.fromString(roleArn).accountId().orElse(null);
}

/**
* Invokes a method dynamically on a given object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
path_prefix: "${getMetadata(\"s3_partition_key\")}"
codec:
event_json:
default_bucket_owner: "<<FUNCTION_NAME:getAccountIdFromRole,PARAMETER:$.<<pipeline-name>>.source.documentdb.aws.sts_role_arn>>"
- s3:
routes:
- stream_load
Expand All @@ -46,7 +47,7 @@
path_prefix: "${getMetadata(\"s3_partition_key\")}"
codec:
event_json:

default_bucket_owner: "<<FUNCTION_NAME:getAccountIdFromRole,PARAMETER:$.<<pipeline-name>>.source.documentdb.aws.sts_role_arn>>"
"<<pipeline-name>>-s3":
workers: "<<$.<<pipeline-name>>.workers>>"
delay: "<<$.<<pipeline-name>>.delay>>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ void setUp() {
when(expressionEvaluator.isValidFormatExpression(anyString())).thenReturn(true);

when(s3SinkConfig.getDefaultBucket()).thenReturn(null);
when(s3SinkConfig.getBucketOwners()).thenReturn(null);
when(s3SinkConfig.getDefaultBucketOwner()).thenReturn(null);
}

private S3Sink createObjectUnderTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -140,6 +141,9 @@ class S3SinkServiceIT {
@Mock
private ExpressionEvaluator expressionEvaluator;

@Mock
private BucketOwnerProvider bucketOwnerProvider;

private OutputCodec codec;
private KeyGenerator keyGenerator;

Expand Down Expand Up @@ -270,7 +274,7 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx
private S3SinkService createObjectUnderTest() {
OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3AsyncClient);
s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3AsyncClient, bucketOwnerProvider);

return new S3SinkService(s3SinkConfig, codecContext, Duration.ofSeconds(5), pluginMetrics, s3GroupManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


import org.apache.parquet.io.PositionOutputStream;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
Expand Down Expand Up @@ -59,6 +60,9 @@ public class S3OutputStream extends PositionOutputStream {
private final byte[] buf;

private final S3AsyncClient s3Client;

private final BucketOwnerProvider bucketOwnerProvider;

/**
* Collection of the etags for the parts that have been uploaded
*/
Expand Down Expand Up @@ -93,7 +97,8 @@ public class S3OutputStream extends PositionOutputStream {
public S3OutputStream(final S3AsyncClient s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {
this.s3Client = s3Client;
this.bucket = bucketSupplier.get();
this.key = keySupplier.get();
Expand All @@ -103,6 +108,7 @@ public S3OutputStream(final S3AsyncClient s3Client,
open = true;
this.defaultBucket = defaultBucket;
this.executorService = Executors.newSingleThreadExecutor();
this.bucketOwnerProvider = bucketOwnerProvider;
}

@Override
Expand Down Expand Up @@ -191,6 +197,7 @@ public CompletableFuture<?> close(final Consumer<Boolean> runOnCompletion, final
.build();
CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucket)
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(defaultBucket).orElse(null))
.key(key)
.uploadId(uploadId)
.multipartUpload(completedMultipartUpload)
Expand Down Expand Up @@ -250,6 +257,7 @@ private void uploadPart() {
int partNumber = etags.size() + 1;
UploadPartRequest uploadRequest = UploadPartRequest.builder()
.bucket(bucket)
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(defaultBucket).orElse(null))
.key(key)
.uploadId(uploadId)
.partNumber(partNumber)
Expand Down Expand Up @@ -278,6 +286,7 @@ private void createMultipartUpload() {
CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(bucket).orElse(null))
.build();
CompletableFuture<CreateMultipartUploadResponse> multipartUpload = s3Client.createMultipartUpload(uploadRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.ConfigBucketOwnerProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -71,6 +73,8 @@ public S3Sink(final PluginSetting pluginSetting,
this.sinkContext = sinkContext;
final PluginModel codecConfiguration = s3SinkConfig.getCodec();
final CodecFactory codecFactory = new CodecFactory(pluginFactory, codecConfiguration);
final ConfigBucketOwnerProviderFactory configBucketOwnerProviderFactory = new ConfigBucketOwnerProviderFactory();
final BucketOwnerProvider bucketOwnerProvider = configBucketOwnerProviderFactory.createBucketOwnerProvider(s3SinkConfig);

final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
codecConfiguration.getPluginSettings());
Expand Down Expand Up @@ -112,7 +116,7 @@ public S3Sink(final PluginSetting pluginSetting,
testCodec.validateAgainstCodecContext(s3OutputCodecContext);

final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client);
final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client, bucketOwnerProvider);


s3SinkService = new S3SinkService(s3SinkConfig, s3OutputCodecContext, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.aws.validator.AwsAccountId;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
Expand All @@ -18,6 +19,8 @@
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions;

import java.util.Map;

/**
* s3 sink configuration class contains properties, used to read yaml configuration.
*/
Expand Down Expand Up @@ -74,6 +77,13 @@ public class S3SinkConfig {
@JsonProperty("max_retries")
private int maxUploadRetries = DEFAULT_UPLOAD_RETRIES;

@JsonProperty("bucket_owners")
private Map<String, @AwsAccountId String> bucketOwners;

@JsonProperty("default_bucket_owner")
@AwsAccountId
private String defaultBucketOwner;

/**
* Aws Authentication configuration Options.
* @return aws authentication options.
Expand Down Expand Up @@ -154,4 +164,12 @@ public CompressionOption getCompression() {
}

public String getDefaultBucket() { return defaultBucket; }

public Map<String, String> getBucketOwners() {
return bucketOwners;
}

public String getDefaultBucketOwner() {
return defaultBucketOwner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.function.Supplier;

public interface BufferFactory {
Buffer getBuffer(S3AsyncClient s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier, String defaultBucket);
Buffer getBuffer(S3AsyncClient s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier, String defaultBucket, BucketOwnerProvider bucketOwnerProvider);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
Expand All @@ -25,26 +26,35 @@ class BufferUtilities {
static final String INVALID_BUCKET = "The specified bucket is not valid";

static CompletableFuture<PutObjectResponse> putObjectOrSendToDefaultBucket(final S3AsyncClient s3Client,
final AsyncRequestBody requestBody,
final Consumer<Boolean> runOnCompletion,
final Consumer<Throwable> runOnFailure,
final String objectKey,
final String targetBucket,
final String defaultBucket) {
final AsyncRequestBody requestBody,
final Consumer<Boolean> runOnCompletion,
final Consumer<Throwable> runOnFailure,
final String objectKey,
final String targetBucket,
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {

final boolean[] defaultBucketAttempted = new boolean[1];
return s3Client.putObject(
PutObjectRequest.builder().bucket(targetBucket).key(objectKey).build(), requestBody)
PutObjectRequest.builder()
.bucket(targetBucket)
.key(objectKey)
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(targetBucket).orElse(null))
.build(), requestBody)
.handle((result, ex) -> {
if (ex != null) {
runOnFailure.accept(ex);

if (defaultBucket != null &&
(ex instanceof NoSuchBucketException || ex.getMessage().contains(ACCESS_DENIED) || ex.getMessage().contains(INVALID_BUCKET))) {
(ex instanceof NoSuchBucketException || ex.getCause() instanceof NoSuchBucketException || ex.getMessage().contains(ACCESS_DENIED) || ex.getMessage().contains(INVALID_BUCKET))) {
LOG.warn("Bucket {} could not be accessed, attempting to send to default_bucket {}", targetBucket, defaultBucket);
defaultBucketAttempted[0] = true;
return s3Client.putObject(
PutObjectRequest.builder().bucket(defaultBucket).key(objectKey).build(),
PutObjectRequest.builder()
.bucket(defaultBucket)
.key(objectKey)
.expectedBucketOwner(bucketOwnerProvider.getBucketOwner(defaultBucket).orElse(null))
.build(),
requestBody);
} else {
runOnCompletion.accept(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.opensearch.dataprepper.plugins.sink.s3.codec.BufferedCodec;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.function.Supplier;
Expand All @@ -18,8 +19,9 @@ public CodecBufferFactory(BufferFactory innerBufferFactory, BufferedCodec codec)
public Buffer getBuffer(final S3AsyncClient s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket);
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {
Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider);
return new CodecBuffer(innerBuffer, bufferedCodec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Objects;
Expand All @@ -17,7 +18,9 @@ public class CompressionBufferFactory implements BufferFactory {
private final CompressionEngine compressionEngine;
private final boolean compressionInternal;

public CompressionBufferFactory(final BufferFactory innerBufferFactory, final CompressionEngine compressionEngine, final OutputCodec codec) {
public CompressionBufferFactory(final BufferFactory innerBufferFactory,
final CompressionEngine compressionEngine,
final OutputCodec codec) {
this.innerBufferFactory = Objects.requireNonNull(innerBufferFactory);
this.compressionEngine = Objects.requireNonNull(compressionEngine);
compressionInternal = Objects.requireNonNull(codec).isCompressionInternal();
Expand All @@ -27,8 +30,9 @@ public CompressionBufferFactory(final BufferFactory innerBufferFactory, final Co
public Buffer getBuffer(final S3AsyncClient s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket);
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {
final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket, bucketOwnerProvider);
if(compressionInternal)
return internalBuffer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.apache.commons.lang3.time.StopWatch;
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;

Expand All @@ -28,6 +29,8 @@ public class InMemoryBuffer implements Buffer {
private final S3AsyncClient s3Client;
private final Supplier<String> bucketSupplier;
private final Supplier<String> keySupplier;

private final BucketOwnerProvider bucketOwnerProvider;
private int eventCount;
private final StopWatch watch;
private boolean isCodecStarted;
Expand All @@ -39,7 +42,8 @@ public class InMemoryBuffer implements Buffer {
InMemoryBuffer(final S3AsyncClient s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
final String defaultBucket,
final BucketOwnerProvider bucketOwnerProvider) {
this.s3Client = s3Client;
this.bucketSupplier = bucketSupplier;
this.keySupplier = keySupplier;
Expand All @@ -49,6 +53,7 @@ public class InMemoryBuffer implements Buffer {
watch.start();
isCodecStarted = false;
this.defaultBucket = defaultBucket;
this.bucketOwnerProvider = bucketOwnerProvider;
}

@Override
Expand All @@ -73,7 +78,7 @@ public Optional<CompletableFuture<?>> flushToS3(final Consumer<Boolean> consumeO
final byte[] byteArray = byteArrayOutputStream.toByteArray();
return Optional.ofNullable(BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, AsyncRequestBody.fromBytes(byteArray),
consumeOnCompletion, consumeOnException,
getKey(), getBucket(), defaultBucket));
getKey(), getBucket(), defaultBucket, bucketOwnerProvider));
}

private String getBucket() {
Expand Down
Loading

0 comments on commit f5b0fee

Please sign in to comment.