From 85c6b4b9422f9dbf4b43047760794140149d9476 Mon Sep 17 00:00:00 2001 From: Srikanth Govindarajan Date: Tue, 25 Jun 2024 09:51:28 -0700 Subject: [PATCH] Refactor lambda plugin (#4643) * Refactor lambda plugin Signed-off-by: Srikanth Govindarajan * Address comments Signed-off-by: Srikanth Govindarajan * Address comments 2 Signed-off-by: Srikanth Govindarajan --------- Signed-off-by: Srikanth Govindarajan Signed-off-by: Srikanth Govindarajan Signed-off-by: Krishna Kondaka --- .../{lambda-sink => lambda}/README.md | 0 .../{lambda-sink => lambda}/build.gradle | 5 ++ .../sink/lambda/LambdaSinkServiceIT.java | 17 +++--- .../lambda/common}/accumlator/Buffer.java | 7 ++- .../common}/accumlator/BufferFactory.java | 2 +- .../common}/accumlator/InMemoryBuffer.java | 20 ++++++- .../accumlator/InMemoryBufferFactory.java | 2 +- .../lambda/common}/codec/LambdaJsonCodec.java | 3 +- .../config/AwsAuthenticationOptions.java | 2 +- .../lambda/common}/config/BatchOptions.java | 4 +- .../common}/config/ThresholdOptions.java | 7 ++- .../lambda/common/util}/ThresholdCheck.java | 7 +-- .../lambda/sink}/LambdaClientFactory.java | 4 +- .../plugins/lambda/sink}/LambdaSink.java | 8 +-- .../lambda/sink}/LambdaSinkConfig.java | 8 +-- .../lambda/sink}/LambdaSinkService.java | 33 ++++++----- .../lambda/sink}/dlq/DlqPushHandler.java | 2 +- .../sink}/dlq/LambdaSinkFailedDlqData.java | 2 +- .../lambda/common}/ThresholdCheckTest.java | 12 ++-- .../InMemoryBufferFactoryTest.java | 6 +- .../accumulator/InMemoryBufferTest.java | 31 +++++----- .../common}/codec/LambdaJsonCodecTest.java | 2 +- .../common}/config/ThresholdOptionsTest.java | 7 +-- .../lambda/sink}/LambdaClientFactoryTest.java | 21 ++++--- .../lambda/sink}/LambdaSinkConfigTest.java | 5 +- .../lambda/sink}/LambdaSinkServiceTest.java | 58 +++++++++---------- .../plugins/lambda/sink}/LambdaSinkTest.java | 13 ++--- .../lambda/sink}/dlq/DlqPushHandlerTest.java | 19 +++--- .../org.mockito.plugins.MockMaker | 0 settings.gradle | 2 +- 30 files changed, 163 insertions(+), 146 deletions(-) rename data-prepper-plugins/{lambda-sink => lambda}/README.md (100%) rename data-prepper-plugins/{lambda-sink => lambda}/build.gradle (89%) rename data-prepper-plugins/{lambda-sink => lambda}/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java (93%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/accumlator/Buffer.java (68%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/accumlator/BufferFactory.java (82%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/accumlator/InMemoryBuffer.java (84%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/accumlator/InMemoryBufferFactory.java (85%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/codec/LambdaJsonCodec.java (95%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/config/AwsAuthenticationOptions.java (95%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/config/BatchOptions.java (80%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common}/config/ThresholdOptions.java (95%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util}/ThresholdCheck.java (84%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaClientFactory.java (93%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSink.java (93%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkConfig.java (90%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkService.java (92%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/dlq/DlqPushHandler.java (98%) rename data-prepper-plugins/{lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink}/dlq/LambdaSinkFailedDlqData.java (95%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common}/ThresholdCheckTest.java (95%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common}/accumulator/InMemoryBufferFactoryTest.java (78%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common}/accumulator/InMemoryBufferTest.java (95%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common}/codec/LambdaJsonCodecTest.java (98%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common}/config/ThresholdOptionsTest.java (93%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaClientFactoryTest.java (96%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkConfigTest.java (94%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkServiceTest.java (94%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink}/LambdaSinkTest.java (95%) rename data-prepper-plugins/{lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda => lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink}/dlq/DlqPushHandlerTest.java (95%) rename data-prepper-plugins/{lambda-sink => lambda}/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker (100%) diff --git a/data-prepper-plugins/lambda-sink/README.md b/data-prepper-plugins/lambda/README.md similarity index 100% rename from data-prepper-plugins/lambda-sink/README.md rename to data-prepper-plugins/lambda/README.md diff --git a/data-prepper-plugins/lambda-sink/build.gradle b/data-prepper-plugins/lambda/build.gradle similarity index 89% rename from data-prepper-plugins/lambda-sink/build.gradle rename to data-prepper-plugins/lambda/build.gradle index 429e190a6a..d0c09c9c8b 100644 --- a/data-prepper-plugins/lambda-sink/build.gradle +++ b/data-prepper-plugins/lambda/build.gradle @@ -19,6 +19,11 @@ dependencies { implementation'org.json:json' implementation libs.commons.lang3 implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + implementation 'org.projectlombok:lombok:1.18.22' + compileOnly 'org.projectlombok:lombok:1.18.20' + annotationProcessor 'org.projectlombok:lombok:1.18.20' + testCompileOnly 'org.projectlombok:lombok:1.18.20' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.20' testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' testImplementation project(':data-prepper-test-common') testImplementation project(':data-prepper-plugins:parse-json-processor') diff --git a/data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java b/data-prepper-plugins/lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java similarity index 93% rename from data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java rename to data-prepper-plugins/lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java index 89cf85ceac..76fb4831ce 100644 --- a/data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java +++ b/data-prepper-plugins/lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java @@ -17,6 +17,7 @@ import org.mockito.Mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.mockito.MockitoAnnotations; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; @@ -29,12 +30,14 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.config.ThresholdOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkConfig; +import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkService; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaClient; @@ -45,8 +48,6 @@ import java.util.HashMap; import java.util.List; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class LambdaSinkServiceIT { diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java similarity index 68% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java index 48afbe6a01..f52a8e5de0 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/Buffer.java @@ -3,9 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; +package org.opensearch.dataprepper.plugins.lambda.common.accumlator; import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; import java.io.OutputStream; import java.time.Duration; @@ -21,7 +22,9 @@ public interface Buffer { Duration getDuration(); - void flushToLambda(); + void flushToLambdaAsync(); + + InvokeResponse flushToLambdaSync(); OutputStream getOutputStream(); diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java similarity index 82% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java index 80afd2f1ca..e44bbd6aee 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/BufferFactory.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; +package org.opensearch.dataprepper.plugins.lambda.common.accumlator; import software.amazon.awssdk.services.lambda.LambdaClient; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java similarity index 84% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java index bba70c6e62..5d9d5a5134 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; +package org.opensearch.dataprepper.plugins.lambda.common.accumlator; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -62,7 +62,22 @@ public Duration getDuration() { @Override - public void flushToLambda() { + public void flushToLambdaAsync() { + InvokeResponse resp; + SdkBytes payload = getPayload(); + + // Setup an InvokeRequest. + InvokeRequest request = InvokeRequest.builder() + .functionName(functionName) + .payload(payload) + .invocationType(invocationType) + .build(); + + lambdaClient.invoke(request); + } + + @Override + public InvokeResponse flushToLambdaSync() { InvokeResponse resp; SdkBytes payload = getPayload(); @@ -74,6 +89,7 @@ public void flushToLambda() { .build(); resp = lambdaClient.invoke(request); + return resp; } private SdkBytes validatePayload(String payload_string) { diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java similarity index 85% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java index e58952c5cb..37ad4a4105 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBufferFactory.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; +package org.opensearch.dataprepper.plugins.lambda.common.accumlator; import software.amazon.awssdk.services.lambda.LambdaClient; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodec.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodec.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java index 5bf21f5e18..a1ccaa8561 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodec.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodec.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.codec; +package org.opensearch.dataprepper.plugins.lambda.common.codec; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; @@ -37,7 +37,6 @@ public String getExtension() { @Override public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException { Objects.requireNonNull(outputStream); - Objects.requireNonNull(codecContext); this.codecContext = codecContext; generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); if(Objects.nonNull(keyName)){ diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java index 8d6c64829d..e40fa617ee 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/AwsAuthenticationOptions.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.config; +package org.opensearch.dataprepper.plugins.lambda.common.config; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Size; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java similarity index 80% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java index 3773d4e6ed..099bed2b54 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.config; +package org.opensearch.dataprepper.plugins.lambda.common.config; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotNull; @@ -18,7 +18,7 @@ public class BatchOptions { @JsonProperty("threshold") @NotNull - ThresholdOptions thresholdOptions; + ThresholdOptions thresholdOptions = new ThresholdOptions(); public String getBatchKey(){return batchKey;} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java index 031157c4be..1f92b90b48 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptions.java @@ -3,15 +3,16 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.config; +package org.opensearch.dataprepper.plugins.lambda.common.config; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; import org.hibernate.validator.constraints.time.DurationMax; import org.hibernate.validator.constraints.time.DurationMin; import org.opensearch.dataprepper.model.types.ByteCount; + import java.time.Duration; -import jakarta.validation.constraints.NotNull; -import jakarta.validation.constraints.Size; public class ThresholdOptions { diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java similarity index 84% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java index 74aa98e7f9..6bbf8a4ab8 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java @@ -3,10 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.common.util; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; import java.time.Duration; @@ -15,9 +15,6 @@ */ public class ThresholdCheck { - private ThresholdCheck() { - } - public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final Duration maxCollectionDuration, final Boolean isBatchEnabled) { if (!isBatchEnabled) return true; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java similarity index 93% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java index 3e33a4e835..03b94340f0 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactory.java @@ -3,11 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java similarity index 93% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java index b1ef905233..54e484fd13 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSink.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; @@ -17,9 +17,9 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.sink.SinkContext; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.lambda.LambdaClient; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java similarity index 90% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java index a20fa41181..bb50e2510e 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfig.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; @@ -10,11 +10,11 @@ import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; import org.opensearch.dataprepper.model.configuration.PluginModel; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; -import java.util.Objects; import java.util.Map; +import java.util.Objects; public class LambdaSinkConfig { diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java similarity index 92% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java index f10607e7d1..9a788e6816 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkService.java @@ -3,29 +3,30 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.codec.LambdaJsonCodec; -import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.LambdaSinkFailedDlqData; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.codec.LambdaJsonCodec; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.LambdaSinkFailedDlqData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.lambda.LambdaClient; @@ -48,7 +49,7 @@ public class LambdaSinkService { private final PluginSetting pluginSetting; private final Lock reentrantLock; private final LambdaSinkConfig lambdaSinkConfig; - private LambdaClient lambdaClient; + private final LambdaClient lambdaClient; private final String functionName; private int maxEvents = 0; private ByteCount maxBytes = null; @@ -65,9 +66,9 @@ public class LambdaSinkService { private final List events; private OutputCodec codec = null; private final BatchOptions batchOptions; - private Boolean isBatchEnabled; + private final Boolean isBatchEnabled; private OutputCodecContext codecContext = null; - private String batchKey; + private final String batchKey; public LambdaSinkService(final LambdaClient lambdaClient, final LambdaSinkConfig lambdaSinkConfig, @@ -213,7 +214,7 @@ protected boolean retryFlushToLambda(Buffer currentBuffer, do { try { - currentBuffer.flushToLambda(); + currentBuffer.flushToLambdaAsync(); isUploadedToLambda = Boolean.TRUE; } catch (AwsServiceException | SdkClientException e) { errorMsgObj.set(e.getMessage()); diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java similarity index 98% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java index 1bdeb0a394..da8c52eb4e 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandler.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.dlq; +package org.opensearch.dataprepper.plugins.lambda.sink.dlq; import com.fasterxml.jackson.databind.ObjectWriter; import io.micrometer.core.instrument.util.StringUtils; diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java rename to data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java index 0808010e37..8941966b77 100644 --- a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java +++ b/data-prepper-plugins/lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/LambdaSinkFailedDlqData.java @@ -2,7 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.dlq; +package org.opensearch.dataprepper.plugins.lambda.sink.dlq; import com.fasterxml.jackson.core.JsonProcessingException; import software.amazon.awssdk.core.SdkBytes; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java index b63553911a..d56420d18f 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java @@ -3,23 +3,23 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.common; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import static org.mockito.Mockito.when; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck; import java.io.IOException; import java.time.Duration; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class ThresholdCheckTest { diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java similarity index 78% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java index d161b28bb0..37276db819 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferFactoryTest.java @@ -3,12 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumulator; +package org.opensearch.dataprepper.plugins.lambda.common.accumulator; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java index 478650a300..fb164b1ac1 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/accumulator/InMemoryBufferTest.java @@ -3,16 +3,26 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.accumulator; +package org.opensearch.dataprepper.plugins.lambda.common.accumulator; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; import org.hamcrest.Matchers; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import org.junit.jupiter.api.Assertions; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import static org.mockito.ArgumentMatchers.any; import org.mockito.Mock; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBuffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBuffer; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.lambda.LambdaClient; @@ -25,17 +35,6 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class InMemoryBufferTest { @@ -119,7 +118,7 @@ void test_with_write_event_into_buffer_and_flush_toLambda() throws IOException { inMemoryBuffer.setEventCount(eventCount); } assertDoesNotThrow(() -> { - inMemoryBuffer.flushToLambda(); + inMemoryBuffer.flushToLambdaAsync(); }); } @@ -136,7 +135,7 @@ void test_uploadedToLambda_success() throws IOException { OutputStream outputStream = inMemoryBuffer.getOutputStream(); outputStream.write(generateByteArray()); assertDoesNotThrow(() -> { - inMemoryBuffer.flushToLambda(); + inMemoryBuffer.flushToLambdaAsync(); }); } @@ -153,7 +152,7 @@ void test_uploadedToLambda_fails() { inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); Assertions.assertNotNull(inMemoryBuffer); - SdkClientException actualException = assertThrows(SdkClientException.class, () -> inMemoryBuffer.flushToLambda()); + SdkClientException actualException = assertThrows(SdkClientException.class, () -> inMemoryBuffer.flushToLambdaAsync()); assertThat(actualException, Matchers.equalTo(sdkClientException)); } diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodecTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java similarity index 98% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodecTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java index 6de6ce8a0e..4b6e4c5caf 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodecTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/codec/LambdaJsonCodecTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.codec; +package org.opensearch.dataprepper.plugins.lambda.common.codec; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java similarity index 93% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java index 53bd0a4edf..5d12aca3da 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ThresholdOptionsTest.java @@ -3,13 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.config; - -import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.model.types.ByteCount; +package org.opensearch.dataprepper.plugins.lambda.common.config; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; class ThresholdOptionsTest { private static final String DEFAULT_BYTE_CAPACITY = "6mb"; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java similarity index 96% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java index ab72ee44b8..9ed5c71fb2 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaClientFactoryTest.java @@ -2,35 +2,34 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; +import static org.mockito.ArgumentMatchers.any; import org.mockito.Mock; import org.mockito.MockedStatic; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaClient; import software.amazon.awssdk.services.lambda.LambdaClientBuilder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import java.util.Map; import java.util.UUID; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java similarity index 94% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java index eda9488a04..2a6dad3a69 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkConfigTest.java @@ -2,12 +2,13 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.hamcrest.MatcherAssert; import org.junit.jupiter.api.Test; import software.amazon.awssdk.regions.Region; @@ -21,7 +22,7 @@ class LambdaSinkConfigTest { @Test void lambda_sink_default_max_connection_retries_test(){ - assertThat(new LambdaSinkConfig().getMaxConnectionRetries(),equalTo(DEFAULT_MAX_RETRIES)); + MatcherAssert.assertThat(new LambdaSinkConfig().getMaxConnectionRetries(),equalTo(DEFAULT_MAX_RETRIES)); } @Test diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java similarity index 94% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java index bbab8778c0..4e678c191d 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkServiceTest.java @@ -2,34 +2,46 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import io.micrometer.core.instrument.Counter; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertEquals; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBuffer; -import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.config.ThresholdOptions; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; -import org.opensearch.dataprepper.plugins.sink.lambda.dlq.LambdaSinkFailedDlqData; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBuffer; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.lambda.sink.dlq.LambdaSinkFailedDlqData; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.http.SdkHttpResponse; @@ -37,7 +49,6 @@ import software.amazon.awssdk.services.lambda.model.InvokeRequest; import software.amazon.awssdk.services.lambda.model.InvokeResponse; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.time.Duration; @@ -47,19 +58,6 @@ import java.util.List; import java.util.Map; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.ArgumentMatchers.any; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; - public class LambdaSinkServiceTest { public static final int maxEvents = 10; @@ -193,7 +191,7 @@ public void lambda_sink_test_max_retires_works() throws IOException { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); when(buffer.getOutputStream()).thenReturn(byteArrayOutputStream); when(bufferFactory.getBuffer(any(LambdaClient.class),any(),any())).thenReturn(buffer); - doThrow(AwsServiceException.class).when(buffer).flushToLambda(); + doThrow(AwsServiceException.class).when(buffer).flushToLambdaAsync(); LambdaSinkService lambdaSinkService = new LambdaSinkService(lambdaClient, lambdaSinkConfig, @@ -209,7 +207,7 @@ public void lambda_sink_test_max_retires_works() throws IOException { Collection> records = List.of(eventRecord); lambdaSinkService.output(records); - verify(buffer, times(3)).flushToLambda(); + verify(buffer, times(3)).flushToLambdaAsync(); } @Test @@ -232,7 +230,7 @@ public void lambda_sink_test_dlq_works() throws IOException { when(buffer.getOutputStream()).thenReturn(byteArrayOutputStream); when(bufferFactory.getBuffer(any(LambdaClient.class),any(),any())).thenReturn(buffer); - doThrow(AwsServiceException.class).when(buffer).flushToLambda(); + doThrow(AwsServiceException.class).when(buffer).flushToLambdaAsync(); LambdaSinkService lambdaSinkService = new LambdaSinkService(lambdaClient, lambdaSinkConfig, @@ -249,7 +247,7 @@ public void lambda_sink_test_dlq_works() throws IOException { lambdaSinkService.output(records); - verify(buffer, times(3)).flushToLambda(); + verify(buffer, times(3)).flushToLambdaAsync(); verify(dlqPushHandler,times(1)).perform(any(PluginSetting.class),any(Object.class)); } diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java index 1687cbd285..9a042014f0 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/LambdaSinkTest.java @@ -3,28 +3,27 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda; +package org.opensearch.dataprepper.plugins.lambda.sink; import org.junit.jupiter.api.Assertions; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.sink.SinkContext; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaClient; import java.util.HashMap; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - class LambdaSinkTest { public static final String S3_REGION = "us-east-1"; diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java similarity index 95% rename from data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java rename to data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java index 17f39973b7..e1de3303a1 100644 --- a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java +++ b/data-prepper-plugins/lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/sink/dlq/DlqPushHandlerTest.java @@ -2,17 +2,24 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.lambda.dlq; +package org.opensearch.dataprepper.plugins.lambda.sink.dlq; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; -import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; import software.amazon.awssdk.core.SdkBytes; import java.io.IOException; @@ -20,14 +27,6 @@ import java.util.Map; import java.util.Optional; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - class DlqPushHandlerTest { private static final String BUCKET = "bucket"; diff --git a/data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/lambda/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker similarity index 100% rename from data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker rename to data-prepper-plugins/lambda/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/settings.gradle b/settings.gradle index 8400ff98c2..64d86219ea 100644 --- a/settings.gradle +++ b/settings.gradle @@ -175,4 +175,4 @@ include 'data-prepper-plugins:mongodb' include 'data-prepper-plugins:rds-source' include 'data-prepper-plugins:http-source-common' include 'data-prepper-plugins:http-common' -include 'data-prepper-plugins:lambda-sink' \ No newline at end of file +include 'data-prepper-plugins:lambda' \ No newline at end of file