From a9a738bea34856d5b0cd5eb9f6c7f1d939609dbe Mon Sep 17 00:00:00 2001 From: Srikanth Govindarajan Date: Fri, 28 Jun 2024 00:22:25 -0700 Subject: [PATCH 1/2] Lambda sink refactor Signed-off-by: Srikanth Govindarajan --- data-prepper-plugins/aws-lambda/README.md | 72 +++++++++++++++++++ .../{lambda => aws-lambda}/build.gradle | 14 ++-- .../lambda/sink}/LambdaSinkServiceIT.java | 4 +- .../lambda/common/accumlator/Buffer.java | 14 ++++ .../common/accumlator/BufferFactory.java | 0 .../common/accumlator/InMemoryBuffer.java | 60 ++++++++++++++-- .../accumlator/InMemoryBufferFactory.java | 0 .../lambda/common/codec/LambdaJsonCodec.java | 0 .../config/AwsAuthenticationOptions.java | 0 .../lambda/common/config/BatchOptions.java | 0 .../common/config/ThresholdOptions.java | 2 +- .../lambda/common/util/ThresholdCheck.java | 0 .../lambda/sink/LambdaClientFactory.java | 0 .../plugins/lambda/sink/LambdaSink.java | 2 +- .../plugins/lambda/sink/LambdaSinkConfig.java | 0 .../lambda/sink/LambdaSinkService.java | 0 .../lambda/sink/dlq/DlqPushHandler.java | 0 .../sink/dlq/LambdaSinkFailedDlqData.java | 0 .../lambda/common/ThresholdCheckTest.java | 0 .../InMemoryBufferFactoryTest.java | 0 .../accumulator/InMemoryBufferTest.java | 0 .../common/codec/LambdaJsonCodecTest.java | 0 .../common/config/ThresholdOptionsTest.java | 2 +- .../lambda/sink/LambdaClientFactoryTest.java | 0 .../lambda/sink/LambdaSinkConfigTest.java | 0 .../lambda/sink/LambdaSinkServiceTest.java | 0 .../plugins/lambda/sink/LambdaSinkTest.java | 2 +- .../lambda/sink/dlq/DlqPushHandlerTest.java | 0 .../org.mockito.plugins.MockMaker | 3 + .../test/resources/simplelogger.properties | 0 data-prepper-plugins/lambda/README.md | 36 ---------- settings.gradle | 2 +- 32 files changed, 160 insertions(+), 53 deletions(-) create mode 100644 data-prepper-plugins/aws-lambda/README.md rename data-prepper-plugins/{lambda => aws-lambda}/build.gradle (73%) rename data-prepper-plugins/{lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda => aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkServiceIT.java (97%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java (67%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java (64%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java (95%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java (97%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java (93%) rename data-prepper-plugins/{lambda => aws-lambda}/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java (100%) rename data-prepper-plugins/{lambda => aws-lambda}/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java (98%) rename data-prepper-plugins/{lambda => aws-lambda}/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java (100%) create mode 100644 data-prepper-plugins/aws-lambda/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker rename data-prepper-plugins/{lambda => aws-lambda}/src/test/resources/simplelogger.properties (100%) delete mode 100644 data-prepper-plugins/lambda/README.md diff --git a/data-prepper-plugins/aws-lambda/README.md b/data-prepper-plugins/aws-lambda/README.md new file mode 100644 index 0000000000..d995bc5202 --- /dev/null +++ b/data-prepper-plugins/aws-lambda/README.md @@ -0,0 +1,72 @@ + +# Lambda Processor + +This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing. + +## Usage +```aidl +lambda-pipeline: +... + processor: + - aws_lambda: + aws: + region: "us-east-1" + sts_role_arn: "" + function_name: "uploadToS3Lambda" + max_retries: 3 + mode: "synchronous" + batch: + batch_key: "osi_key" + threshold: + event_count: 3 + maximum_size: 6mb + event_collect_timeout: 15s +``` + +## Developer Guide + +The integration tests for this plugin do not run as part of the Data Prepper build. +The following command runs the integration tests: + +``` +./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.processor.lambda.region="us-east-1" -Dtests.processor.lambda.functionName="lambda_test_function" -Dtests.processor.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role + +``` + + +# Lambda Sink + +This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing. + +## Usage +```aidl +lambda-pipeline: +... + sink: + - aws_lambda: + aws: + region: "us-east-1" + sts_role_arn: "" + function_name: "uploadToS3Lambda" + max_retries: 3 + batch: + batch_key: "osi_key" + threshold: + event_count: 3 + maximum_size: 6mb + event_collect_timeout: 15s + dlq: + s3: + bucket: test-bucket + key_path_prefix: dlq/ +``` + +## Developer Guide + +The integration tests for this plugin do not run as part of the Data Prepper build. +The following command runs the integration tests: + +``` +./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.sink.lambda.region="us-east-1" -Dtests.sink.lambda.functionName="lambda_test_function" -Dtests.sink.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role + +``` diff --git a/data-prepper-plugins/lambda/build.gradle b/data-prepper-plugins/aws-lambda/build.gradle similarity index 73% rename from data-prepper-plugins/lambda/build.gradle rename to data-prepper-plugins/aws-lambda/build.gradle index 8447c3abdf..d59f4fd066 100644 --- a/data-prepper-plugins/lambda/build.gradle +++ b/data-prepper-plugins/aws-lambda/build.gradle @@ -26,8 +26,10 @@ dependencies { testAnnotationProcessor 'org.projectlombok:lombok:1.18.20' testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' testImplementation project(':data-prepper-test-common') - testImplementation project(':data-prepper-plugins:parse-json-processor') testImplementation testLibs.slf4j.simple + testImplementation 'org.mockito:mockito-core:4.6.1' + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.2' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.2' } test { @@ -59,9 +61,13 @@ task integrationTest(type: Test) { classpath = sourceSets.integrationTest.runtimeClasspath systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties' - systemProperty 'tests.sink.lambda.region', System.getProperty('tests.sink.lambda.region') - systemProperty 'tests.sink.lambda.functionName', System.getProperty('tests.sink.lambda.functionName') - systemProperty 'tests.sink.lambda.sts_role_arn', System.getProperty('tests.sink.lambda.sts_role_arn') + systemProperty 'tests.lambda.sink.region', System.getProperty('tests.lambda.sink.region') + systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName') + systemProperty 'tests.lambda.sink.sts_role_arn', System.getProperty('tests.lambda.sink.sts_role_arn') + + systemProperty 'tests.lambda.processor.region', System.getProperty('tests.lambda.processor.region') + systemProperty 'tests.lambda.processor.functionName', System.getProperty('tests.lambda.processor.functionName') + systemProperty 'tests.lambda.processor.sts_role_arn', System.getProperty('tests.lambda.processor.sts_role_arn') filter { includeTestsMatching '*IT' diff --git a/data-prepper-plugins/lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceIT.java similarity index 97% rename from data-prepper-plugins/lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java rename to data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceIT.java index 76fb4831ce..1a7e169a47 100644 --- a/data-prepper-plugins/lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java +++ b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceIT.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,8 +35,6 @@ import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; -import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkConfig; -import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkService; import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaClient; diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java similarity index 67% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java index f52a8e5de0..a2c5dde4a9 100644 --- a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java @@ -31,4 +31,18 @@ public interface Buffer { SdkBytes getPayload(); void setEventCount(int eventCount); + + //Metrics + public Duration getFlushLambdaSyncLatencyMetric(); + + public Long getPayloadRequestSyncSize(); + + public Duration getFlushLambdaAsyncLatencyMetric(); + + public Long getPayloadResponseSyncSize(); + + public Long getPayloadRequestAsyncSize(); + + public Long getPayloadResponseAsyncSize(); + } diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java similarity index 64% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java index 5d9d5a5134..095e6f47b2 100644 --- a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java @@ -12,6 +12,7 @@ import software.amazon.awssdk.services.lambda.LambdaClient; import software.amazon.awssdk.services.lambda.model.InvokeRequest; import software.amazon.awssdk.services.lambda.model.InvokeResponse; +import software.amazon.awssdk.services.lambda.model.LambdaException; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -31,7 +32,13 @@ public class InMemoryBuffer implements Buffer { private final String invocationType; private int eventCount; private final StopWatch watch; + private final StopWatch lambdaSyncLatencyWatch; + private final StopWatch lambdaAsyncLatencyWatch; private boolean isCodecStarted; + private long payloadRequestSyncSize; + private long payloadResponseSyncSize; + private long payloadRequestAsyncSize; + private long payloadResponseAsyncSize; public InMemoryBuffer(LambdaClient lambdaClient, String functionName, String invocationType) { @@ -44,6 +51,12 @@ public InMemoryBuffer(LambdaClient lambdaClient, String functionName, String inv watch = new StopWatch(); watch.start(); isCodecStarted = false; + lambdaSyncLatencyWatch = new StopWatch(); + lambdaAsyncLatencyWatch = new StopWatch(); + payloadRequestSyncSize = 0; + payloadResponseSyncSize = 0; + payloadRequestAsyncSize = 0; + payloadResponseAsyncSize =0; } @Override @@ -65,6 +78,7 @@ public Duration getDuration() { public void flushToLambdaAsync() { InvokeResponse resp; SdkBytes payload = getPayload(); + payloadRequestAsyncSize = payload.asByteArray().length; // Setup an InvokeRequest. InvokeRequest request = InvokeRequest.builder() @@ -73,13 +87,17 @@ public void flushToLambdaAsync() { .invocationType(invocationType) .build(); - lambdaClient.invoke(request); + lambdaAsyncLatencyWatch.start(); + resp = lambdaClient.invoke(request); + lambdaAsyncLatencyWatch.stop(); + payloadResponseAsyncSize = resp.payload().asByteArray().length; } @Override public InvokeResponse flushToLambdaSync() { - InvokeResponse resp; + InvokeResponse resp = null; SdkBytes payload = getPayload(); + payloadRequestSyncSize = payload.asByteArray().length; // Setup an InvokeRequest. InvokeRequest request = InvokeRequest.builder() @@ -88,8 +106,16 @@ public InvokeResponse flushToLambdaSync() { .invocationType(invocationType) .build(); - resp = lambdaClient.invoke(request); - return resp; + lambdaSyncLatencyWatch.start(); + try { + resp = lambdaClient.invoke(request); + payloadResponseSyncSize = resp.payload().asByteArray().length; + lambdaSyncLatencyWatch.stop(); + return resp; + } catch (LambdaException e){ + lambdaSyncLatencyWatch.stop(); + throw new RuntimeException(e); + } } private SdkBytes validatePayload(String payload_string) { @@ -121,6 +147,30 @@ public SdkBytes getPayload() { byte[] bytes = byteArrayOutputStream.toByteArray(); SdkBytes sdkBytes = SdkBytes.fromByteArray(bytes); return sdkBytes; - } + } + + public Duration getFlushLambdaSyncLatencyMetric (){ + return Duration.ofMillis(lambdaSyncLatencyWatch.getTime(TimeUnit.MILLISECONDS)); + } + + public Duration getFlushLambdaAsyncLatencyMetric (){ + return Duration.ofMillis(lambdaAsyncLatencyWatch.getTime(TimeUnit.MILLISECONDS)); + } + + public Long getPayloadRequestSyncSize() { + return payloadRequestSyncSize; + } + + public Long getPayloadResponseSyncSize() { + return payloadResponseSyncSize; + } + + public Long getPayloadRequestAsyncSize() { + return payloadRequestAsyncSize; + } + + public Long getPayloadResponseAsyncSize() { + return payloadResponseAsyncSize; + } } diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java similarity index 95% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java index 1f92b90b48..ca8ed6e574 100644 --- a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java @@ -17,7 +17,7 @@ public class ThresholdOptions { - private static final String DEFAULT_BYTE_CAPACITY = "6mb"; + private static final String DEFAULT_BYTE_CAPACITY = "3mb"; @JsonProperty("event_count") @Size(min = 0, max = 10000000, message = "event_count size should be between 0 and 10000000") diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java similarity index 97% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java index 54e484fd13..715ef3295d 100644 --- a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java @@ -26,7 +26,7 @@ import java.util.Collection; -@DataPrepperPlugin(name = "lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class) +@DataPrepperPlugin(name = "aws_lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class) public class LambdaSink extends AbstractSink> { private static final Logger LOG = LoggerFactory.getLogger(LambdaSink.class); diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java diff --git a/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java similarity index 100% rename from data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java rename to data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java diff --git a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java similarity index 100% rename from data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java rename to data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java diff --git a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java similarity index 100% rename from data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java rename to data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java diff --git a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java similarity index 100% rename from data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java rename to data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java diff --git a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java similarity index 100% rename from data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java rename to data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java diff --git a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java similarity index 93% rename from data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java rename to data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java index 5d12aca3da..98437b49fe 100644 --- a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java @@ -11,7 +11,7 @@ import org.opensearch.dataprepper.model.types.ByteCount; class ThresholdOptionsTest { - private static final String DEFAULT_BYTE_CAPACITY = "6mb"; + private static final String DEFAULT_BYTE_CAPACITY = "3mb"; private static final int DEFAULT_EVENT_COUNT = 0; @Test diff --git a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java similarity index 100% rename from data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java rename to data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java diff --git a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java similarity index 100% rename from data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java rename to data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java diff --git a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java similarity index 100% rename from data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java rename to data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java diff --git a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java similarity index 98% rename from data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java rename to data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java index 9a042014f0..1842795e7c 100644 --- a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java @@ -28,7 +28,7 @@ class LambdaSinkTest { public static final String S3_REGION = "us-east-1"; public static final String CODEC_PLUGIN_NAME = "json"; - public static final String SINK_PLUGIN_NAME = "lambda"; + public static final String SINK_PLUGIN_NAME = "aws_lambda"; public static final String SINK_PIPELINE_NAME = "lambda-sink-pipeline"; private LambdaSinkConfig lambdaSinkConfig; private LambdaSink lambdaSink; diff --git a/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java similarity index 100% rename from data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java rename to data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java diff --git a/data-prepper-plugins/aws-lambda/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/aws-lambda/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..23c33feb6d --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1,3 @@ +# To enable mocking of final classes with vanilla Mockito +# https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods +mock-maker-inline diff --git a/data-prepper-plugins/lambda/src/test/resources/simplelogger.properties b/data-prepper-plugins/aws-lambda/src/test/resources/simplelogger.properties similarity index 100% rename from data-prepper-plugins/lambda/src/test/resources/simplelogger.properties rename to data-prepper-plugins/aws-lambda/src/test/resources/simplelogger.properties diff --git a/data-prepper-plugins/lambda/README.md b/data-prepper-plugins/lambda/README.md deleted file mode 100644 index c0b2c29211..0000000000 --- a/data-prepper-plugins/lambda/README.md +++ /dev/null @@ -1,36 +0,0 @@ -# Lambda Sink - -This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing. - -## Usage -```aidl -lambda-pipeline: -... - sink: - - lambda: - aws: - region: "us-east-1" - sts_role_arn: "" - function_name: "uploadToS3Lambda" - max_retries: 3 - batch: - batch_key: "osi_key" - threshold: - event_count: 3 - maximum_size: 6mb - event_collect_timeout: 15s - dlq: - s3: - bucket: test-bucket - key_path_prefix: dlq/ -``` - -## Developer Guide - -The integration tests for this plugin do not run as part of the Data Prepper build. -The following command runs the integration tests: - -``` -./gradlew :data-prepper-plugins:lambda-sink:integrationTest -Dtests.sink.lambda.region="us-east-1" -Dtests.sink.lambda.functionName="lambda_test_function" -Dtests.sink.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role - -``` diff --git a/settings.gradle b/settings.gradle index 9d84b2ccf0..cb7e888c53 100644 --- a/settings.gradle +++ b/settings.gradle @@ -178,4 +178,4 @@ include 'data-prepper-plugins:mongodb' include 'data-prepper-plugins:rds-source' include 'data-prepper-plugins:http-source-common' include 'data-prepper-plugins:http-common' -include 'data-prepper-plugins:lambda' \ No newline at end of file +include 'data-prepper-plugins:aws-lambda' \ No newline at end of file From 3cd33e42577a98b43d2f8c5a5fef65f743198c8f Mon Sep 17 00:00:00 2001 From: Srikanth Govindarajan Date: Fri, 26 Jul 2024 17:25:52 -0700 Subject: [PATCH 2/2] Address comments Signed-off-by: Srikanth Govindarajan --- data-prepper-plugins/aws-lambda/README.md | 35 -------------------- data-prepper-plugins/aws-lambda/build.gradle | 4 --- 2 files changed, 39 deletions(-) diff --git a/data-prepper-plugins/aws-lambda/README.md b/data-prepper-plugins/aws-lambda/README.md index d995bc5202..4c49873350 100644 --- a/data-prepper-plugins/aws-lambda/README.md +++ b/data-prepper-plugins/aws-lambda/README.md @@ -1,39 +1,4 @@ -# Lambda Processor - -This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing. - -## Usage -```aidl -lambda-pipeline: -... - processor: - - aws_lambda: - aws: - region: "us-east-1" - sts_role_arn: "" - function_name: "uploadToS3Lambda" - max_retries: 3 - mode: "synchronous" - batch: - batch_key: "osi_key" - threshold: - event_count: 3 - maximum_size: 6mb - event_collect_timeout: 15s -``` - -## Developer Guide - -The integration tests for this plugin do not run as part of the Data Prepper build. -The following command runs the integration tests: - -``` -./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.processor.lambda.region="us-east-1" -Dtests.processor.lambda.functionName="lambda_test_function" -Dtests.processor.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role - -``` - - # Lambda Sink This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing. diff --git a/data-prepper-plugins/aws-lambda/build.gradle b/data-prepper-plugins/aws-lambda/build.gradle index d59f4fd066..be9280e8c8 100644 --- a/data-prepper-plugins/aws-lambda/build.gradle +++ b/data-prepper-plugins/aws-lambda/build.gradle @@ -65,10 +65,6 @@ task integrationTest(type: Test) { systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName') systemProperty 'tests.lambda.sink.sts_role_arn', System.getProperty('tests.lambda.sink.sts_role_arn') - systemProperty 'tests.lambda.processor.region', System.getProperty('tests.lambda.processor.region') - systemProperty 'tests.lambda.processor.functionName', System.getProperty('tests.lambda.processor.functionName') - systemProperty 'tests.lambda.processor.sts_role_arn', System.getProperty('tests.lambda.processor.sts_role_arn') - filter { includeTestsMatching '*IT' }