Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CMK encryption support to DynamoDB export #3592

Merged
merged 1 commit into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data-prepper-plugins/dynamodb-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ source:

* s3_bucket (Required): The destination bucket to store the exported data files
* s3_prefix (Optional): Custom prefix.
* s3_sse_kms_key_id (Optional): A AWS KMS Customer Managed Key (CMK) to encrypt the export data files. The key id will
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have at least three options starting with s3_. It may be ideal to make this s3: since we haven't yet released this feature.

s3:
  bucket: my-bucket
  prefix: /custom/prefix
  sse_kms_key_id: arn:...

Copy link
Contributor Author

@daixba daixba Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be confused this way, it seems more options are supported apart from S3.

So I think it's better if we just align this with the API structure: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ExportTableToPointInTime.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense - I'm ok with this.

be the ARN of the Key, e.g. arn:aws:kms:us-west-2:123456789012:key/0a4bc22f-bb96-4ad4-80ca-63b12b3ec147

### Stream Configurations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ public void init() {
Instant startTime = Instant.now();

if (tableInfo.getMetadata().isExportRequired()) {
createExportPartition(tableInfo.getTableArn(), startTime, tableInfo.getMetadata().getExportBucket(), tableInfo.getMetadata().getExportPrefix());
createExportPartition(
tableInfo.getTableArn(),
startTime,
tableInfo.getMetadata().getExportBucket(),
tableInfo.getMetadata().getExportPrefix(),
tableInfo.getMetadata().getExportKmsKeyId());
}

if (tableInfo.getMetadata().isStreamRequired()) {
Expand Down Expand Up @@ -209,11 +214,12 @@ public void init() {
* @param bucket Export bucket
* @param prefix Export Prefix
*/
private void createExportPartition(String tableArn, Instant exportTime, String bucket, String prefix) {
private void createExportPartition(String tableArn, Instant exportTime, String bucket, String prefix, String kmsKeyId) {
ExportProgressState exportProgressState = new ExportProgressState();
exportProgressState.setBucket(bucket);
exportProgressState.setPrefix(prefix);
exportProgressState.setExportTime(exportTime.toString()); // information purpose
exportProgressState.setKmsKeyId(kmsKeyId);
ExportPartition exportPartition = new ExportPartition(tableArn, exportTime, Optional.of(exportProgressState));
coordinator.createPartition(exportPartition);
}
Expand Down Expand Up @@ -310,6 +316,7 @@ private TableInfo getTableInfo(TableConfig tableConfig) {
.streamStartPosition(streamStartPosition)
.exportBucket(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Bucket())
.exportPrefix(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Prefix())
.exportKmsKeyId(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3SseKmsKeyId())
.build();
return new TableInfo(tableConfig.getTableArn(), metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotBlank;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;

public class ExportConfig {

@JsonProperty("s3_bucket")
@NotBlank(message = "Bucket Name is required for export")
private String s3Bucket;

@JsonProperty("s3_prefix")
private String s3Prefix;

@JsonProperty("s3_region")
private String s3Region;

@JsonProperty("s3_sse_kms_key_id")
private String s3SseKmsKeyId;

public String getS3Bucket() {
return s3Bucket;
}
Expand All @@ -33,4 +37,15 @@ public Region getAwsRegion() {
return s3Region != null ? Region.of(s3Region) : null;
}

public String getS3SseKmsKeyId() {
return s3SseKmsKeyId;
}

@AssertTrue(message = "KMS Key ID must be a valid one.")
boolean isKmsKeyIdValid() {
// If key id is provided, it should be in a format like
// arn:aws:kms:us-west-2:123456789012:key/0a4bc22f-bb96-4ad3-80ca-63b12b3ec147
return s3SseKmsKeyId == null || Arn.fromString(s3SseKmsKeyId).resourceAsString() != null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we really want this validation to be so strict. Users should be allowed to provide KMS keys as aliases. e.g. alias/MyAliasedKey.

Copy link
Contributor Author

@daixba daixba Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with this. Even from DynamoDB console, there is no way to provide an aliaes, can only provide KMS Key ID. Note that alias is optional to KMS CMK. I believe you want more flexibility here, but to get Key ID (which is used in the API call) by alias, we will need to have more permissions (maybe describe keys or something).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also alias may not work for multi-region keys.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@daixba , Typically AWS services allow for either specifying an alias or an ARN. So my ask is not to require the user to provide an alias. It is to allow the user to provide either an ARN or an alias.

The documentation on this is unclear, but their restrictions indicate a minimum length of 1 which is compatible with an alias.

We shouldn't follow the Console, but what the API itself allows. If this API does not permit an alias, then I agree this should be only an ARN. But, it appears to support the alias.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably wrap this in a try catch to avoid NPE's and exceptions from just throwing here with invalid ARN format, but this can be added as a follow on

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ public class ExportProgressState {
@JsonProperty("prefix")
private String prefix;

@JsonProperty("kmsKeyId")
private String kmsKeyId;

@JsonProperty("exportTime")
private String exportTime;


public String getExportArn() {
return exportArn;
}
Expand Down Expand Up @@ -64,4 +67,12 @@ public String getStatus() {
public void setStatus(String status) {
this.status = status;
}

public String getKmsKeyId() {
return kmsKeyId;
}

public void setKmsKeyId(String kmsKeyId) {
this.kmsKeyId = kmsKeyId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
Expand Down Expand Up @@ -169,14 +169,12 @@ public void run() {
int lineCount = 0;
int lastLineProcessed = 0;

try {
InputStream inputStream = objectReader.readFile(bucketName, key);
GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream));
try (InputStream inputStream = objectReader.readFile(bucketName, key);
GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream))) {

String line;
while ((line = reader.readLine()) != null) {

if (shouldStop) {
checkpointer.checkpoint(lastLineProcessed);
LOG.debug("Should Stop flag is set to True, looks like shutdown has triggered");
Expand Down Expand Up @@ -213,9 +211,6 @@ public void run() {
}

lines.clear();
reader.close();
gzipInputStream.close();
inputStream.close();

LOG.info("Completed loading s3://{}/{} to buffer", bucketName, key);

Expand All @@ -226,7 +221,7 @@ public void run() {
} catch (Exception e) {
checkpointer.checkpoint(lineCount);

String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %S", bucketName, key, e.getMessage());
String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %s", bucketName, key, e.getMessage());
throw new RuntimeException(errorMessage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ private BiConsumer<String, Throwable> completeExport(ExportPartition exportParti
ExportProgressState state = exportPartition.getProgressState().get();
String bucketName = state.getBucket();
String exportArn = state.getExportArn();



String manifestKey = exportTaskManager.getExportManifest(exportArn);
LOG.debug("Export manifest summary file is " + manifestKey);

Expand Down Expand Up @@ -189,6 +188,7 @@ private void createDataFilePartitions(String exportArn, String bucketName, Map<S
DataFileProgressState progressState = new DataFileProgressState();
progressState.setTotal(size);
progressState.setLoaded(0);

totalFiles.addAndGet(1);
totalRecords.addAndGet(size);
DataFilePartition partition = new DataFilePartition(exportArn, bucketName, key, Optional.of(progressState));
Expand Down Expand Up @@ -260,7 +260,7 @@ private String getOrCreateExportArn(ExportPartition exportPartition) {

LOG.info("Try to submit a new export job for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime());
// submit a new export request
String exportArn = exportTaskManager.submitExportJob(exportPartition.getTableArn(), state.getBucket(), state.getPrefix(), exportPartition.getExportTime());
String exportArn = exportTaskManager.submitExportJob(exportPartition.getTableArn(), state.getBucket(), state.getPrefix(), state.getKmsKeyId(), exportPartition.getExportTime());

// Update state with export Arn in the coordination table.
// So that it won't be submitted again after a restart.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import software.amazon.awssdk.services.dynamodb.model.ExportFormat;
import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeRequest;
import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeResponse;
import software.amazon.awssdk.services.dynamodb.model.S3SseAlgorithm;

import java.time.Instant;

Expand All @@ -30,12 +31,15 @@ public ExportTaskManager(DynamoDbClient dynamoDBClient) {
this.dynamoDBClient = dynamoDBClient;
}

public String submitExportJob(String tableArn, String bucketName, String prefix, Instant exportTime) {
public String submitExportJob(String tableArn, String bucket, String prefix, String kmsKeyId, Instant exportTime) {
S3SseAlgorithm algorithm = kmsKeyId == null || kmsKeyId.isEmpty() ? S3SseAlgorithm.AES256 : S3SseAlgorithm.KMS;
// No needs to use a client token here.
ExportTableToPointInTimeRequest req = ExportTableToPointInTimeRequest.builder()
.tableArn(tableArn)
.s3Bucket(bucketName)
.s3Bucket(bucket)
.s3Prefix(prefix)
.s3SseAlgorithm(algorithm)
.s3SseKmsKeyId(kmsKeyId)
.exportFormat(DEFAULT_EXPORT_FORMAT)
.exportTime(exportTime)
.build();
Expand All @@ -46,7 +50,6 @@ public String submitExportJob(String tableArn, String bucketName, String prefix,

String exportArn = response.exportDescription().exportArn();
String status = response.exportDescription().exportStatusAsString();

LOG.debug("Export Job submitted with ARN {} and status {}", exportArn, status);
return exportArn;
} catch (SdkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
Expand All @@ -39,22 +37,18 @@ public ManifestFileReader(S3ObjectReader objectReader) {

public ExportSummary parseSummaryFile(String bucket, String key) {
LOG.debug("Try to read the manifest summary file");
InputStream object = objectReader.readFile(bucket, key);

BufferedReader reader = new BufferedReader(new InputStreamReader(object));
try {
try (InputStream object = objectReader.readFile(bucket, key);
BufferedReader reader = new BufferedReader(new InputStreamReader(object))) {
// Only one line
String line = reader.readLine();
LOG.debug("Manifest summary: {}", line);
ExportSummary summaryInfo = MAPPER.readValue(line, ExportSummary.class);
return summaryInfo;

} catch (JsonProcessingException e) {
} catch (Exception e) {
LOG.error("Failed to parse the summary info due to {}", e.getMessage());
throw new RuntimeException(e);

} catch (IOException e) {
LOG.error("IO Exception due to {}", e.getMessage());
throw new RuntimeException(e);
}

}
Expand All @@ -63,11 +57,10 @@ public Map<String, Integer> parseDataFile(String bucket, String key) {
LOG.info("Try to read the manifest data file");

Map<String, Integer> result = new HashMap<>();
InputStream object = objectReader.readFile(bucket, key);
BufferedReader reader = new BufferedReader(new InputStreamReader(object));

String line;
try {

try (InputStream object = objectReader.readFile(bucket, key);
BufferedReader reader = new BufferedReader(new InputStreamReader(object))) {
String line;
while ((line = reader.readLine()) != null) {
// An example line as below:
// {"itemCount":46331,"md5Checksum":"a0k21IY3eelgr2PuWJLjJw==","etag":"51f9f394903c5d682321c6211aae8b6a-1","dataFileS3Key":"test-table-export/AWSDynamoDB/01692350182719-6de2c037/data/fpgzwz7ome3s7a5gqn2mu3ogtq.json.gz"}
Expand All @@ -76,8 +69,9 @@ public Map<String, Integer> parseDataFile(String bucket, String key) {
result.put(map.get(DATA_FILE_S3_KEY), Integer.valueOf(map.get(DATA_FILE_ITEM_COUNT_KEY)));

}
} catch (IOException e) {
LOG.error("IO Exception due to {}", e.getMessage());
} catch (Exception e) {
LOG.error("Exception due to {}", e.getMessage());
throw new RuntimeException(e);
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public class TableMetadata {
private static final String REQUIRE_EXPORT_KEY = "export";
private static final String REQUIRE_STREAM_KEY = "stream";


private final String partitionKeyAttributeName;

private final String sortKeyAttributeName;
Expand All @@ -36,6 +35,8 @@ public class TableMetadata {

private final String exportPrefix;

private final String exportKmsKeyId;

private TableMetadata(Builder builder) {
this.partitionKeyAttributeName = builder.partitionKeyAttributeName;
this.sortKeyAttributeName = builder.sortKeyAttributeName;
Expand All @@ -45,6 +46,7 @@ private TableMetadata(Builder builder) {
this.exportBucket = builder.exportBucket;
this.exportPrefix = builder.exportPrefix;
this.streamStartPosition = builder.streamStartPosition;
this.exportKmsKeyId = builder.exportKmsKeyId;

}

Expand All @@ -70,6 +72,8 @@ public static class Builder {

private String exportPrefix;

private String exportKmsKeyId;

private StreamStartPosition streamStartPosition;


Expand Down Expand Up @@ -108,6 +112,11 @@ public Builder exportPrefix(String exportPrefix) {
return this;
}

public Builder exportKmsKeyId(String exportKmsKeyId) {
this.exportKmsKeyId = exportKmsKeyId;
return this;
}

public Builder streamStartPosition(StreamStartPosition streamStartPosition) {
this.streamStartPosition = streamStartPosition;
return this;
Expand Down Expand Up @@ -173,4 +182,8 @@ public String getExportBucket() {
public String getExportPrefix() {
return exportPrefix;
}

public String getExportKmsKeyId() {
return exportKmsKeyId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class DataFileLoaderTest {

private final Random random = new Random();

private final int total = random.nextInt(10);
private final int total = random.nextInt(10) + 1;

@BeforeEach
void setup() {
Expand Down Expand Up @@ -153,8 +153,8 @@ void test_run_loadFile_correctly() {
try (
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class);
final MockedConstruction<ExportRecordConverter> recordConverterMockedConstruction = mockConstruction(ExportRecordConverter.class, (mock, context) -> {
exportRecordConverter = mock;
})) {
exportRecordConverter = mock;
})) {
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer)
.bucketName(bucketName)
Expand Down
Loading