From 97a90e38d197ae4260cae06263869a9f817a0ab9 Mon Sep 17 00:00:00 2001 From: Aiden Dai Date: Mon, 6 Nov 2023 14:45:31 +0800 Subject: [PATCH] Add CMK encryption support to DynamoDB export Signed-off-by: Aiden Dai --- .../dynamodb-source/README.md | 2 ++ .../source/dynamodb/DynamoDBService.java | 11 ++++++-- .../dynamodb/configuration/ExportConfig.java | 17 ++++++++++- .../state/ExportProgressState.java | 13 ++++++++- .../dynamodb/export/DataFileLoader.java | 15 ++++------ .../dynamodb/export/ExportScheduler.java | 6 ++-- .../dynamodb/export/ExportTaskManager.java | 9 ++++-- .../dynamodb/export/ManifestFileReader.java | 28 ++++++++----------- .../source/dynamodb/model/TableMetadata.java | 15 +++++++++- .../dynamodb/export/DataFileLoaderTest.java | 6 ++-- 10 files changed, 81 insertions(+), 41 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/README.md b/data-prepper-plugins/dynamodb-source/README.md index 1585e66e12..db620a6406 100644 --- a/data-prepper-plugins/dynamodb-source/README.md +++ b/data-prepper-plugins/dynamodb-source/README.md @@ -52,6 +52,8 @@ source: * s3_bucket (Required): The destination bucket to store the exported data files * s3_prefix (Optional): Custom prefix. +* s3_sse_kms_key_id (Optional): A AWS KMS Customer Managed Key (CMK) to encrypt the export data files. The key id will + be the ARN of the Key, e.g. arn:aws:kms:us-west-2:123456789012:key/0a4bc22f-bb96-4ad4-80ca-63b12b3ec147 ### Stream Configurations diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index 1916a5344f..fed9e83f79 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -171,7 +171,12 @@ public void init() { Instant startTime = Instant.now(); if (tableInfo.getMetadata().isExportRequired()) { - createExportPartition(tableInfo.getTableArn(), startTime, tableInfo.getMetadata().getExportBucket(), tableInfo.getMetadata().getExportPrefix()); + createExportPartition( + tableInfo.getTableArn(), + startTime, + tableInfo.getMetadata().getExportBucket(), + tableInfo.getMetadata().getExportPrefix(), + tableInfo.getMetadata().getExportKmsKeyId()); } if (tableInfo.getMetadata().isStreamRequired()) { @@ -209,11 +214,12 @@ public void init() { * @param bucket Export bucket * @param prefix Export Prefix */ - private void createExportPartition(String tableArn, Instant exportTime, String bucket, String prefix) { + private void createExportPartition(String tableArn, Instant exportTime, String bucket, String prefix, String kmsKeyId) { ExportProgressState exportProgressState = new ExportProgressState(); exportProgressState.setBucket(bucket); exportProgressState.setPrefix(prefix); exportProgressState.setExportTime(exportTime.toString()); // information purpose + exportProgressState.setKmsKeyId(kmsKeyId); ExportPartition exportPartition = new ExportPartition(tableArn, exportTime, Optional.of(exportProgressState)); coordinator.createPartition(exportPartition); } @@ -310,6 +316,7 @@ private TableInfo getTableInfo(TableConfig tableConfig) { .streamStartPosition(streamStartPosition) .exportBucket(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Bucket()) .exportPrefix(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Prefix()) + .exportKmsKeyId(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3SseKmsKeyId()) .build(); return new TableInfo(tableConfig.getTableArn(), metadata); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java index d34017599a..dd1aac12a7 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/ExportConfig.java @@ -6,7 +6,9 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.configuration; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotBlank; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.regions.Region; public class ExportConfig { @@ -14,13 +16,15 @@ public class ExportConfig { @JsonProperty("s3_bucket") @NotBlank(message = "Bucket Name is required for export") private String s3Bucket; - @JsonProperty("s3_prefix") private String s3Prefix; @JsonProperty("s3_region") private String s3Region; + @JsonProperty("s3_sse_kms_key_id") + private String s3SseKmsKeyId; + public String getS3Bucket() { return s3Bucket; } @@ -33,4 +37,15 @@ public Region getAwsRegion() { return s3Region != null ? Region.of(s3Region) : null; } + public String getS3SseKmsKeyId() { + return s3SseKmsKeyId; + } + + @AssertTrue(message = "KMS Key ID must be a valid one.") + boolean isKmsKeyIdValid() { + // If key id is provided, it should be in a format like + // arn:aws:kms:us-west-2:123456789012:key/0a4bc22f-bb96-4ad3-80ca-63b12b3ec147 + return s3SseKmsKeyId == null || Arn.fromString(s3SseKmsKeyId).resourceAsString() != null; + } + } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/ExportProgressState.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/ExportProgressState.java index 4aa15cacf5..e7dadde99c 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/ExportProgressState.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/state/ExportProgressState.java @@ -21,10 +21,13 @@ public class ExportProgressState { @JsonProperty("prefix") private String prefix; + @JsonProperty("kmsKeyId") + private String kmsKeyId; + @JsonProperty("exportTime") private String exportTime; - + public String getExportArn() { return exportArn; } @@ -64,4 +67,12 @@ public String getStatus() { public void setStatus(String status) { this.status = status; } + + public String getKmsKeyId() { + return kmsKeyId; + } + + public void setKmsKeyId(String kmsKeyId) { + this.kmsKeyId = kmsKeyId; + } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java index 56a2065055..6093d5d688 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java @@ -7,10 +7,10 @@ import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.slf4j.Logger; @@ -169,14 +169,12 @@ public void run() { int lineCount = 0; int lastLineProcessed = 0; - try { - InputStream inputStream = objectReader.readFile(bucketName, key); - GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream); - BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream)); + try (InputStream inputStream = objectReader.readFile(bucketName, key); + GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream); + BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream))) { String line; while ((line = reader.readLine()) != null) { - if (shouldStop) { checkpointer.checkpoint(lastLineProcessed); LOG.debug("Should Stop flag is set to True, looks like shutdown has triggered"); @@ -213,9 +211,6 @@ public void run() { } lines.clear(); - reader.close(); - gzipInputStream.close(); - inputStream.close(); LOG.info("Completed loading s3://{}/{} to buffer", bucketName, key); @@ -226,7 +221,7 @@ public void run() { } catch (Exception e) { checkpointer.checkpoint(lineCount); - String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %S", bucketName, key, e.getMessage()); + String errorMessage = String.format("Loading of s3://%s/%s completed with Exception: %s", bucketName, key, e.getMessage()); throw new RuntimeException(errorMessage); } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java index 9ace3aa6ae..c150643d56 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java @@ -156,8 +156,7 @@ private BiConsumer completeExport(ExportPartition exportParti ExportProgressState state = exportPartition.getProgressState().get(); String bucketName = state.getBucket(); String exportArn = state.getExportArn(); - - + String manifestKey = exportTaskManager.getExportManifest(exportArn); LOG.debug("Export manifest summary file is " + manifestKey); @@ -189,6 +188,7 @@ private void createDataFilePartitions(String exportArn, String bucketName, Map parseDataFile(String bucket, String key) { LOG.info("Try to read the manifest data file"); Map result = new HashMap<>(); - InputStream object = objectReader.readFile(bucket, key); - BufferedReader reader = new BufferedReader(new InputStreamReader(object)); - - String line; - try { + + try (InputStream object = objectReader.readFile(bucket, key); + BufferedReader reader = new BufferedReader(new InputStreamReader(object))) { + String line; while ((line = reader.readLine()) != null) { // An example line as below: // {"itemCount":46331,"md5Checksum":"a0k21IY3eelgr2PuWJLjJw==","etag":"51f9f394903c5d682321c6211aae8b6a-1","dataFileS3Key":"test-table-export/AWSDynamoDB/01692350182719-6de2c037/data/fpgzwz7ome3s7a5gqn2mu3ogtq.json.gz"} @@ -76,8 +69,9 @@ public Map parseDataFile(String bucket, String key) { result.put(map.get(DATA_FILE_S3_KEY), Integer.valueOf(map.get(DATA_FILE_ITEM_COUNT_KEY))); } - } catch (IOException e) { - LOG.error("IO Exception due to {}", e.getMessage()); + } catch (Exception e) { + LOG.error("Exception due to {}", e.getMessage()); + throw new RuntimeException(e); } return result; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/TableMetadata.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/TableMetadata.java index 8415a41001..ba21304d31 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/TableMetadata.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/TableMetadata.java @@ -19,7 +19,6 @@ public class TableMetadata { private static final String REQUIRE_EXPORT_KEY = "export"; private static final String REQUIRE_STREAM_KEY = "stream"; - private final String partitionKeyAttributeName; private final String sortKeyAttributeName; @@ -36,6 +35,8 @@ public class TableMetadata { private final String exportPrefix; + private final String exportKmsKeyId; + private TableMetadata(Builder builder) { this.partitionKeyAttributeName = builder.partitionKeyAttributeName; this.sortKeyAttributeName = builder.sortKeyAttributeName; @@ -45,6 +46,7 @@ private TableMetadata(Builder builder) { this.exportBucket = builder.exportBucket; this.exportPrefix = builder.exportPrefix; this.streamStartPosition = builder.streamStartPosition; + this.exportKmsKeyId = builder.exportKmsKeyId; } @@ -70,6 +72,8 @@ public static class Builder { private String exportPrefix; + private String exportKmsKeyId; + private StreamStartPosition streamStartPosition; @@ -108,6 +112,11 @@ public Builder exportPrefix(String exportPrefix) { return this; } + public Builder exportKmsKeyId(String exportKmsKeyId) { + this.exportKmsKeyId = exportKmsKeyId; + return this; + } + public Builder streamStartPosition(StreamStartPosition streamStartPosition) { this.streamStartPosition = streamStartPosition; return this; @@ -173,4 +182,8 @@ public String getExportBucket() { public String getExportPrefix() { return exportPrefix; } + + public String getExportKmsKeyId() { + return exportKmsKeyId; + } } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java index a92b80645b..162cef81b3 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java @@ -93,7 +93,7 @@ class DataFileLoaderTest { private final Random random = new Random(); - private final int total = random.nextInt(10); + private final int total = random.nextInt(10) + 1; @BeforeEach void setup() { @@ -153,8 +153,8 @@ void test_run_loadFile_correctly() { try ( final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class); final MockedConstruction recordConverterMockedConstruction = mockConstruction(ExportRecordConverter.class, (mock, context) -> { - exportRecordConverter = mock; - })) { + exportRecordConverter = mock; + })) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer) .bucketName(bucketName)