From f0fcd11092b27052dc600efa5ebb53284f589795 Mon Sep 17 00:00:00 2001 From: srigovs Date: Thu, 22 Feb 2024 13:00:13 -0800 Subject: [PATCH] Add support for lambda sink Signed-off-by: srigovs --- .../config/PipelineParserConfiguration.java | 3 + .../parser/PipelineTransformerTests.java | 3 + .../processor/flatten/FlattenProcessor.java | 19 + .../flatten/FlattenProcessorConfig.java | 12 + .../flatten/FlattenProcessorConfigTest.java | 6 + .../flatten/FlattenProcessorTest.java | 12 + .../truststore/TrustStoreProvider.java | 20 +- .../util/CustomClientSslEngineFactory.java | 5 +- .../util/KafkaSecurityConfigurerTest.java | 2 +- data-prepper-plugins/lambda-sink/README.md | 36 ++ data-prepper-plugins/lambda-sink/build.gradle | 63 ++++ .../sink/lambda/LambdaSinkServiceIT.java | 216 ++++++++++++ .../sink/lambda/LambdaClientFactory.java | 46 +++ .../plugins/sink/lambda/LambdaSink.java | 105 ++++++ .../plugins/sink/lambda/LambdaSinkConfig.java | 80 +++++ .../sink/lambda/LambdaSinkService.java | 226 ++++++++++++ .../plugins/sink/lambda/ThresholdCheck.java | 33 ++ .../sink/lambda/accumlator/Buffer.java | 31 ++ .../sink/lambda/accumlator/BufferFactory.java | 14 + .../lambda/accumlator/InMemoryBuffer.java | 110 ++++++ .../accumlator/InMemoryBufferFactory.java | 16 + .../sink/lambda/codec/LambdaJsonCodec.java | 79 +++++ .../config/AwsAuthenticationOptions.java | 46 +++ .../sink/lambda/config/BatchOptions.java | 27 ++ .../sink/lambda/config/ThresholdOptions.java | 58 ++++ .../sink/lambda/dlq/DlqPushHandler.java | 131 +++++++ .../lambda/dlq/LambdaSinkFailedDlqData.java | 60 ++++ .../sink/lambda/LambdaClientFactoryTest.java | 98 ++++++ .../sink/lambda/LambdaSinkConfigTest.java | 69 ++++ .../sink/lambda/LambdaSinkServiceTest.java | 326 ++++++++++++++++++ .../plugins/sink/lambda/LambdaSinkTest.java | 82 +++++ .../sink/lambda/ThresholdCheckTest.java | 126 +++++++ .../InMemoryBufferFactoryTest.java | 32 ++ .../accumulator/InMemoryBufferTest.java | 167 +++++++++ .../lambda/codec/LambdaJsonCodecTest.java | 110 ++++++ .../lambda/config/ThresholdOptionsTest.java | 33 ++ .../sink/lambda/dlq/DlqPushHandlerTest.java | 88 +++++ .../org.mockito.plugins.MockMaker | 3 + .../mongo/client/MongoDBConnection.java | 8 +- .../configuration/MongoDBSourceConfig.java | 2 +- .../mongo/client/MongoDBConnectionTest.java | 4 +- settings.gradle | 3 +- 42 files changed, 2593 insertions(+), 17 deletions(-) create mode 100644 data-prepper-plugins/lambda-sink/README.md create mode 100644 data-prepper-plugins/lambda-sink/build.gradle create mode 100644 data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodec.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodecTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java index f4f586c5b6..edf6862632 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java @@ -13,8 +13,11 @@ import org.opensearch.dataprepper.parser.PipelineTransformer; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider; +<<<<<<< HEAD import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader; import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationReader; +======= +>>>>>>> df4e99518 (Extract parsing in data-prepper-core to data-prepper-pipeline-parser module (#4247)) import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser; import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator; import org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java index 07817f8ee0..a55b63a2b1 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java @@ -31,7 +31,10 @@ import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider; import org.opensearch.dataprepper.peerforwarder.PeerForwarderReceiveBuffer; import org.opensearch.dataprepper.pipeline.Pipeline; +<<<<<<< HEAD import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader; +======= +>>>>>>> df4e99518 (Extract parsing in data-prepper-core to data-prepper-pipeline-parser module (#4247)) import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser; import org.opensearch.dataprepper.pipeline.router.RouterFactory; import org.opensearch.dataprepper.plugin.DefaultPluginFactory; diff --git a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java index 2a07fd6d99..10fdf05d51 100644 --- a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java +++ b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java @@ -26,33 +26,47 @@ @DataPrepperPlugin(name = "flatten", pluginType = Processor.class, pluginConfigurationType = FlattenProcessorConfig.class) public class FlattenProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(FlattenProcessor.class); +<<<<<<< HEAD private static final String SEPARATOR = "/"; private final FlattenProcessorConfig config; private final ExpressionEvaluator expressionEvaluator; private final Map excludeKeysAndJsonPointers = new HashMap<>(); +======= + + private static final String SEPARATOR = "/"; + private final FlattenProcessorConfig config; + private final ExpressionEvaluator expressionEvaluator; +>>>>>>> 185a1383f (Add flatten processor (#4138)) @DataPrepperPluginConstructor public FlattenProcessor(final PluginMetrics pluginMetrics, final FlattenProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); this.config = config; this.expressionEvaluator = expressionEvaluator; +<<<<<<< HEAD for (final String key : config.getExcludeKeys()) { excludeKeysAndJsonPointers.put(key, getJsonPointer(config.getSource(), key)); } +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) } @Override public Collection> doExecute(final Collection> records) { for (final Record record : records) { final Event recordEvent = record.getData(); +<<<<<<< HEAD final Map excludeMap = new HashMap<>(); +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) try { if (config.getFlattenWhen() != null && !expressionEvaluator.evaluateConditional(config.getFlattenWhen(), recordEvent)) { continue; } +<<<<<<< HEAD // remove fields specified in "exclude_keys" from the event temporarily for (final String key : excludeKeysAndJsonPointers.keySet()) { final String keyInEvent = excludeKeysAndJsonPointers.get(key); @@ -62,6 +76,8 @@ public Collection> doExecute(final Collection> recor } } +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) final String sourceJson = recordEvent.getAsJsonString(config.getSource()); // adds ignoreReservedCharacters() so that dots in keys are ignored during flattening @@ -83,12 +99,15 @@ public Collection> doExecute(final Collection> recor } catch (Exception e) { LOG.error("Fail to perform flatten operation", e); recordEvent.getMetadata().addTags(config.getTagsOnFailure()); +<<<<<<< HEAD } finally { // Add temporarily deleted fields back for (final String key : excludeMap.keySet()) { final String keyInEvent = excludeKeysAndJsonPointers.get(key); recordEvent.put(keyInEvent, excludeMap.get(key)); } +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) } } return records; diff --git a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java index 96c9d2e024..8489dd04dd 100644 --- a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java +++ b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java @@ -8,13 +8,19 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotNull; +<<<<<<< HEAD import java.util.ArrayList; +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) import java.util.List; public class FlattenProcessorConfig { +<<<<<<< HEAD private static final List DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) @NotNull @JsonProperty("source") private String source; @@ -29,9 +35,12 @@ public class FlattenProcessorConfig { @JsonProperty("remove_list_indices") private boolean removeListIndices = false; +<<<<<<< HEAD @JsonProperty("exclude_keys") private List excludeKeys = DEFAULT_EXCLUDE_KEYS; +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) @JsonProperty("flatten_when") private String flattenWhen; @@ -54,10 +63,13 @@ public boolean isRemoveListIndices() { return removeListIndices; } +<<<<<<< HEAD public List getExcludeKeys() { return excludeKeys; } +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) public String getFlattenWhen() { return flattenWhen; } diff --git a/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfigTest.java b/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfigTest.java index d11860df0e..a8a18ce097 100644 --- a/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfigTest.java +++ b/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfigTest.java @@ -7,8 +7,11 @@ import org.junit.jupiter.api.Test; +<<<<<<< HEAD import java.util.List; +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -23,6 +26,9 @@ void testDefaultConfig() { assertThat(FlattenProcessorConfig.isRemoveListIndices(), equalTo(false)); assertThat(FlattenProcessorConfig.getFlattenWhen(), equalTo(null)); assertThat(FlattenProcessorConfig.getTagsOnFailure(), equalTo(null)); +<<<<<<< HEAD assertThat(FlattenProcessorConfig.getExcludeKeys(), equalTo(List.of())); +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) } } diff --git a/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java b/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java index 737d245ff5..d562c774a3 100644 --- a/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java +++ b/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java @@ -8,9 +8,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +<<<<<<< HEAD import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.expression.ExpressionEvaluator; @@ -25,7 +28,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; +<<<<<<< HEAD import java.util.stream.Stream; +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -54,7 +60,10 @@ void setUp() { lenient().when(mockConfig.isRemoveListIndices()).thenReturn(false); lenient().when(mockConfig.getFlattenWhen()).thenReturn(null); lenient().when(mockConfig.getTagsOnFailure()).thenReturn(new ArrayList<>()); +<<<<<<< HEAD lenient().when(mockConfig.getExcludeKeys()).thenReturn(new ArrayList<>()); +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) } @Test @@ -226,6 +235,7 @@ void testFailureTagsAreAddedWhenException() { assertThat(resultEvent.getMetadata().getTags(), is(new HashSet<>(testTags))); } +<<<<<<< HEAD @ParameterizedTest @MethodSource("excludeKeysTestArguments") void testFlattenWithExcludeKeys(String source, String target, List excludeKeys, Map expectedResultMap) { @@ -330,6 +340,8 @@ private static Stream excludeKeysTestArguments() { ); } +======= +>>>>>>> 185a1383f (Add flatten processor (#4138)) private FlattenProcessor createObjectUnderTest() { return new FlattenProcessor(pluginMetrics, mockConfig, expressionEvaluator); } diff --git a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProvider.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProvider.java index fb28a5d7c4..483e7430b7 100644 --- a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProvider.java +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/truststore/TrustStoreProvider.java @@ -25,7 +25,7 @@ public class TrustStoreProvider { private static final Logger LOG = LoggerFactory.getLogger(TrustStoreProvider.class); - public static TrustManager[] createTrustManager(final Path certificatePath) { + public TrustManager[] createTrustManager(final Path certificatePath) { LOG.info("Using the certificate path {} to create trust manager.", certificatePath.toString()); try { final KeyStore keyStore = createKeyStore(certificatePath); @@ -37,7 +37,7 @@ public static TrustManager[] createTrustManager(final Path certificatePath) { } } - public static TrustManager[] createTrustManager(final String certificateContent) { + public TrustManager[] createTrustManager(final String certificateContent) { LOG.info("Using the certificate content to create trust manager."); try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) { final KeyStore keyStore = createKeyStore(certificateInputStream); @@ -49,19 +49,22 @@ public static TrustManager[] createTrustManager(final String certificateContent) } } - public static TrustManager[] createTrustAllManager() { + public TrustManager[] createTrustAllManager() { LOG.info("Using the trust all manager to create trust manager."); return new TrustManager[]{ new X509TrustAllManager() }; } - private static KeyStore createKeyStore(final Path certificatePath) throws Exception { + private KeyStore createKeyStore(final Path certificatePath) throws Exception { try (InputStream certificateInputStream = Files.newInputStream(certificatePath)) { return createKeyStore(certificateInputStream); } } +<<<<<<< HEAD + private KeyStore createKeyStore(final InputStream certificateInputStream) throws Exception { +======= private static KeyStore createKeyStore(final InputStream trustStoreInputStream, final String password) throws Exception { final KeyStore trustStore = KeyStore.getInstance("JKS"); trustStore.load(trustStoreInputStream, password.toCharArray()); @@ -69,6 +72,7 @@ private static KeyStore createKeyStore(final InputStream trustStoreInputStream, } private static KeyStore createKeyStore(final InputStream certificateInputStream) throws Exception { +>>>>>>> e7570ca1a (Add support for creating SSLContext for trustStore file path (#4264)) final CertificateFactory factory = CertificateFactory.getInstance("X.509"); final Certificate trustedCa = factory.generateCertificate(certificateInputStream); final KeyStore trustStore = KeyStore.getInstance("pkcs12"); @@ -77,7 +81,7 @@ private static KeyStore createKeyStore(final InputStream certificateInputStream) return trustStore; } - public static SSLContext createSSLContext(final Path certificatePath) { + public SSLContext createSSLContext(final Path certificatePath) { LOG.info("Using the certificate path to create SSL context."); try (InputStream is = Files.newInputStream(certificatePath)) { return createSSLContext(is); @@ -86,6 +90,9 @@ public static SSLContext createSSLContext(final Path certificatePath) { } } +<<<<<<< HEAD + public SSLContext createSSLContext(final String certificateContent) { +======= public static SSLContext createSSLContext(final Path trustStorePath, final String password) { LOG.info("Using the truststore path and password to create SSL context."); try (InputStream is = Files.newInputStream(trustStorePath)) { @@ -96,6 +103,7 @@ public static SSLContext createSSLContext(final Path trustStorePath, final Strin } public static SSLContext createSSLContext(final String certificateContent) { +>>>>>>> e7570ca1a (Add support for creating SSLContext for trustStore file path (#4264)) LOG.info("Using the certificate content to create SSL context."); try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) { return createSSLContext(certificateInputStream); @@ -121,7 +129,7 @@ private static SSLContext createSSLContext(final InputStream certificateInputStr return sslContext; } - public static SSLContext createSSLContextWithTrustAllStrategy() { + public SSLContext createSSLContextWithTrustAllStrategy() { LOG.info("Using the trust all strategy to create SSL context."); try { return SSLContexts.custom().loadTrustMaterial(null, new TrustStrategy() { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactory.java index 83c52d736b..932b818618 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/CustomClientSslEngineFactory.java @@ -28,11 +28,12 @@ public void configure(Map configs) { } private TrustManager[] getTrustManager() { + final TrustStoreProvider trustStoreProvider = new TrustStoreProvider(); final TrustManager[] trustManagers; if (Objects.nonNull(certificateContent)) { - trustManagers = TrustStoreProvider.createTrustManager(certificateContent); + trustManagers = trustStoreProvider.createTrustManager(certificateContent); } else { - trustManagers = TrustStoreProvider.createTrustAllManager(); + trustManagers = trustStoreProvider.createTrustAllManager(); } return trustManagers; } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java index f1a9af8436..df37803f1b 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java @@ -193,4 +193,4 @@ private KafkaSourceConfig createKafkaSinkConfig(final String fileName) throws IO final Reader reader = new StringReader(json); return mapper.readValue(reader, KafkaSourceConfig.class); } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/README.md b/data-prepper-plugins/lambda-sink/README.md new file mode 100644 index 0000000000..8d05b7485d --- /dev/null +++ b/data-prepper-plugins/lambda-sink/README.md @@ -0,0 +1,36 @@ +# Lambda Sink + +This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing. + +## Usage +```aidl +lambda-pipeline: +... + sink: + - lambda: + aws: + region: "us-east-1" + sts_role_arn: "" + function_name: "uploadToS3Lambda" + max_retries: 3 + batch: + batch_key: "osi_key" + threshold: + event_count: 3 + maximum_size: 6mb + event_collect_timeout: 15s + dlq: + s3: + bucket: test-bucket + key_path_prefix: dlq/ +``` + +## Developer Guide + +The integration tests for this plugin do not run as part of the Data Prepper build. +The following command runs the integration tests: + +``` +./gradlew :data-prepper-plugins:lambda-sink:integrationTest -Dtests.sink.lambda.region="us-east-1" -Dtests.sink.lambda.functionName="lambda_test_function" -Dtests.sink.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role + +``` \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/build.gradle b/data-prepper-plugins/lambda-sink/build.gradle new file mode 100644 index 0000000000..429e190a6a --- /dev/null +++ b/data-prepper-plugins/lambda-sink/build.gradle @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +dependencies { + implementation project(':data-prepper-api') + implementation project(path: ':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:aws-plugin-api') + implementation project(':data-prepper-plugins:failures-common') + implementation 'io.micrometer:micrometer-core' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'software.amazon.awssdk:lambda:2.17.99' + implementation 'software.amazon.awssdk:sdk-core:2.x.x' + implementation 'software.amazon.awssdk:sts' + implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + implementation'org.json:json' + implementation libs.commons.lang3 + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + testImplementation project(':data-prepper-test-common') + testImplementation project(':data-prepper-plugins:parse-json-processor') +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + + systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties' + systemProperty 'tests.sink.lambda.region', System.getProperty('tests.sink.lambda.region') + systemProperty 'tests.sink.lambda.functionName', System.getProperty('tests.sink.lambda.functionName') + systemProperty 'tests.sink.lambda.sts_role_arn', System.getProperty('tests.sink.lambda.sts_role_arn') + + filter { + includeTestsMatching '*IT' + } +} diff --git a/data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java b/data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java new file mode 100644 index 0000000000..89cf85ceac --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java @@ -0,0 +1,216 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.config.ThresholdOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class LambdaSinkServiceIT { + + private LambdaClient lambdaClient; + private String functionName; + private String lambdaRegion; + private String role; + private BufferFactory bufferFactory; + @Mock + private LambdaSinkConfig lambdaSinkConfig; + @Mock + private BatchOptions batchOptions; + @Mock + private ThresholdOptions thresholdOptions; + @Mock + private AwsAuthenticationOptions awsAuthenticationOptions; + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock + private PluginMetrics pluginMetrics; + @Mock + private DlqPushHandler dlqPushHandler; + @Mock + private PluginFactory pluginFactory; + @Mock + private PluginSetting pluginSetting; + @Mock + private Counter numberOfRecordsSuccessCounter; + @Mock + private Counter numberOfRecordsFailedCounter; + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + + @BeforeEach + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this); + lambdaRegion = System.getProperty("tests.sink.lambda.region"); + functionName = System.getProperty("tests.sink.lambda.functionName"); + role = System.getProperty("tests.sink.lambda.sts_role_arn"); + + final Region region = Region.of(lambdaRegion); + + lambdaClient = LambdaClient.builder() + .region(Region.of(lambdaRegion)) + .build(); + + bufferFactory = new InMemoryBufferFactory(); + + when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)). + thenReturn(numberOfRecordsSuccessCounter); + when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)). + thenReturn(numberOfRecordsFailedCounter); + } + + + private static Record createRecord() { + final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build(); + return new Record<>(event); + } + + public LambdaSinkService createObjectUnderTest(final String config) throws JsonProcessingException { + + final LambdaSinkConfig lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList()); + pluginFactory = null; + return new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + pluginFactory, + pluginSetting, + codecContext, + awsCredentialsSupplier, + dlqPushHandler, + bufferFactory); + } + + public LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig) throws JsonProcessingException { + + OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList()); + pluginFactory = null; + return new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + pluginFactory, + pluginSetting, + codecContext, + awsCredentialsSupplier, + dlqPushHandler, + bufferFactory); + } + + + private static Collection> generateRecords(int numberOfRecords) { + List> recordList = new ArrayList<>(); + + for (int rows = 0; rows < numberOfRecords; rows++) { + HashMap eventData = new HashMap<>(); + eventData.put("name", "Person" + rows); + eventData.put("age", Integer.toString(rows)); + + Record eventRecord = new Record<>(JacksonEvent.builder().withData(eventData).withEventType("event").build()); + recordList.add(eventRecord); + } + return recordList; + } + + @ParameterizedTest + @ValueSource(ints = {1,5}) + void verify_flushed_records_to_lambda_success(final int recordCount) throws Exception { + + final String LAMBDA_SINK_CONFIG_YAML = + " function_name: " + functionName +"\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: " + role + "\n" + + " max_retries: 3\n"; + LambdaSinkService objectUnderTest = createObjectUnderTest(LAMBDA_SINK_CONFIG_YAML); + + Collection> recordsData = generateRecords(recordCount); + objectUnderTest.output(recordsData); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + + verify(numberOfRecordsSuccessCounter, times(recordCount)).increment(1); + } + + @ParameterizedTest + @ValueSource(ints = {1,5,10}) + void verify_flushed_records_to_lambda_failed_and_dlq_works(final int recordCount) throws Exception { + final String LAMBDA_SINK_CONFIG_INVALID_FUNCTION_NAME = + " function_name: $$$\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: arn:aws:iam::176893235612:role/osis-s3-opensearch-role\n" + + " max_retries: 3\n" + + " dlq: #any failed even\n"+ + " s3:\n"+ + " bucket: test-bucket\n"+ + " key_path_prefix: dlq/\n"; + LambdaSinkService objectUnderTest = createObjectUnderTest(LAMBDA_SINK_CONFIG_INVALID_FUNCTION_NAME); + + Collection> recordsData = generateRecords(recordCount); + objectUnderTest.output(recordsData); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + + verify( numberOfRecordsFailedCounter, times(recordCount)).increment(1); + } + + @ParameterizedTest + @ValueSource(ints = {2,5}) + void verify_flushed_records_with_batching_to_lambda(final int recordCount) throws JsonProcessingException, InterruptedException { + + int event_count = 2; + when(lambdaSinkConfig.getFunctionName()).thenReturn(functionName); + when(lambdaSinkConfig.getMaxConnectionRetries()).thenReturn(3); + when(thresholdOptions.getEventCount()).thenReturn(event_count); + when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb")); + when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s")); + when(batchOptions.getBatchKey()).thenReturn("lambda_batch_key"); + when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions); + when(lambdaSinkConfig.getBatchOptions()).thenReturn(batchOptions); + + LambdaSinkService objectUnderTest = createObjectUnderTest(lambdaSinkConfig); + Collection> recordsData = generateRecords(recordCount); + objectUnderTest.output(recordsData); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java new file mode 100644 index 0000000000..3e33a4e835 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.services.lambda.LambdaClient; + +public final class LambdaClientFactory { + private LambdaClientFactory() { } + + static LambdaClient createLambdaClient(final LambdaSinkConfig lambdaSinkConfig, + final AwsCredentialsSupplier awsCredentialsSupplier) { + final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(lambdaSinkConfig.getAwsAuthenticationOptions()); + final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); + + return LambdaClient.builder() + .region(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(awsCredentialsProvider) + .overrideConfiguration(createOverrideConfiguration(lambdaSinkConfig)).build(); + + } + + private static ClientOverrideConfiguration createOverrideConfiguration(final LambdaSinkConfig lambdaSinkConfig) { + final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(lambdaSinkConfig.getMaxConnectionRetries()).build(); + return ClientOverrideConfiguration.builder() + .retryPolicy(retryPolicy) + .build(); + } + + private static AwsCredentialsOptions convertToCredentialsOptions(final AwsAuthenticationOptions awsAuthenticationOptions) { + return AwsCredentialsOptions.builder() + .withRegion(awsAuthenticationOptions.getAwsRegion()) + .withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn()) + .withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId()) + .withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides()) + .build(); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java new file mode 100644 index 0000000000..f099da21cf --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; +import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.util.Collection; + +@DataPrepperPlugin(name = "lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class) +public class LambdaSink extends AbstractSink> { + + private static final Logger LOG = LoggerFactory.getLogger(LambdaSink.class); + private final DlqPushHandler dlqPushHandler; + private volatile boolean sinkInitialized; + private final LambdaSinkService lambdaSinkService; + private final BufferFactory bufferFactory; + private static final String BUCKET = "bucket"; + private static final String KEY_PATH = "key_path_prefix"; + + @DataPrepperPluginConstructor + public LambdaSink(final PluginSetting pluginSetting, + final LambdaSinkConfig lambdaSinkConfig, + final PluginFactory pluginFactory, + final SinkContext sinkContext, + final AwsCredentialsSupplier awsCredentialsSupplier + ) { + super(pluginSetting); + sinkInitialized = Boolean.FALSE; + OutputCodecContext outputCodecContext = OutputCodecContext.fromSinkContext(sinkContext); + LambdaClient lambdaClient = LambdaClientFactory.createLambdaClient(lambdaSinkConfig, awsCredentialsSupplier); + this.dlqPushHandler = new DlqPushHandler(pluginFactory, + String.valueOf(lambdaSinkConfig.getDlqPluginSetting().get(BUCKET)), + lambdaSinkConfig.getDlqStsRoleARN() + , lambdaSinkConfig.getDlqStsRegion(), + String.valueOf(lambdaSinkConfig.getDlqPluginSetting().get(KEY_PATH))); + this.bufferFactory = new InMemoryBufferFactory(); + + lambdaSinkService = new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + pluginFactory, + pluginSetting, + outputCodecContext, + awsCredentialsSupplier, + dlqPushHandler, + bufferFactory); + + } + + @Override + public boolean isReady() { + return sinkInitialized; + } + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Invalid plugin configuration, Hence failed to initialize s3-sink plugin."); + this.shutdown(); + throw e; + } catch (Exception e) { + LOG.error("Failed to initialize lambda plugin."); + this.shutdown(); + throw e; + } + } + + private void doInitializeInternal() { + sinkInitialized = Boolean.TRUE; + } + + /** + * @param records Records to be output + */ + @Override + public void doOutput(final Collection> records) { + + if (records.isEmpty()) { + return; + } + lambdaSinkService.output(records); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java new file mode 100644 index 0000000000..a20fa41181 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java @@ -0,0 +1,80 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; + +import java.util.Objects; +import java.util.Map; + +public class LambdaSinkConfig { + + private static final int DEFAULT_CONNECTION_RETRIES = 3; + + public static final String STS_REGION = "region"; + + public static final String STS_ROLE_ARN = "sts_role_arn"; + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("function_name") + @NotEmpty + @NotNull + @Size(min = 3, max = 500, message = "function name length should be at least 3 characters") + private String functionName; + + @JsonProperty("max_retries") + private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; + + @JsonProperty("dlq") + private PluginModel dlq; + + @JsonProperty("batch") + private BatchOptions batchOptions; + + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + public BatchOptions getBatchOptions(){return batchOptions;} + + public String getFunctionName() { + return functionName; + } + + public int getMaxConnectionRetries() { + return maxConnectionRetries; + } + + public PluginModel getDlq() { + return dlq; + } + + public String getDlqStsRoleARN(){ + return Objects.nonNull(getDlqPluginSetting().get(STS_ROLE_ARN)) ? + String.valueOf(getDlqPluginSetting().get(STS_ROLE_ARN)) : + awsAuthenticationOptions.getAwsStsRoleArn(); + } + + public String getDlqStsRegion(){ + return Objects.nonNull(getDlqPluginSetting().get(STS_REGION)) ? + String.valueOf(getDlqPluginSetting().get(STS_REGION)) : + awsAuthenticationOptions.getAwsRegion().toString(); + } + + public Map getDlqPluginSetting(){ + return dlq != null ? dlq.getPluginSettings() : Map.of(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java new file mode 100644 index 0000000000..ec1e83e238 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java @@ -0,0 +1,226 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.codec.LambdaJsonCodec; +import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.LambdaSinkFailedDlqData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LambdaSinkService { + + private static final Logger LOG = LoggerFactory.getLogger(LambdaSinkService.class); + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaSinkObjectsEventsSucceeded"; + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaSinkObjectsEventsFailed"; + private final PluginSetting pluginSetting; + private final Lock reentrantLock; + private final LambdaSinkConfig lambdaSinkConfig; + private LambdaClient lambdaClient; + private final String functionName; + private int maxEvents = 0; + private ByteCount maxBytes = null; + private Duration maxCollectionDuration = null; + private int maxRetries = 0; + private final Counter numberOfRecordsSuccessCounter; + private final Counter numberOfRecordsFailedCounter; + private final String ASYNC_INVOCATION_TYPE = "Event"; + private final String invocationType; + private Buffer currentBuffer; + private final BufferFactory bufferFactory; + private final DlqPushHandler dlqPushHandler; + private final Collection bufferedEventHandles; + private final List events; + private OutputCodec codec = null; + private final BatchOptions batchOptions; + private Boolean isBatchEnabled; + private OutputCodecContext codecContext = null; + private String batchKey; + + public LambdaSinkService(final LambdaClient lambdaClient, + final LambdaSinkConfig lambdaSinkConfig, + final PluginMetrics pluginMetrics, + final PluginFactory pluginFactory, + final PluginSetting pluginSetting, + final OutputCodecContext codecContext, + final AwsCredentialsSupplier awsCredentialsSupplier, + final DlqPushHandler dlqPushHandler, + final BufferFactory bufferFactory) { + this.lambdaSinkConfig = lambdaSinkConfig; + this.pluginSetting = pluginSetting; + this.dlqPushHandler = dlqPushHandler; + this.lambdaClient = lambdaClient; + reentrantLock = new ReentrantLock(); + numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS); + numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED); + functionName = lambdaSinkConfig.getFunctionName(); + + maxRetries = lambdaSinkConfig.getMaxConnectionRetries(); + batchOptions = lambdaSinkConfig.getBatchOptions(); + + if (!Objects.isNull(batchOptions)){ + maxEvents = batchOptions.getThresholdOptions().getEventCount(); + maxBytes = batchOptions.getThresholdOptions().getMaximumSize(); + maxCollectionDuration = batchOptions.getThresholdOptions().getEventCollectTimeOut(); + batchKey = batchOptions.getBatchKey(); + isBatchEnabled = true; + }else{ + batchKey = null; + isBatchEnabled = false; + } + this.codecContext = codecContext; + + codec = new LambdaJsonCodec(batchKey); + bufferedEventHandles = new LinkedList<>(); + events = new ArrayList(); + + invocationType = ASYNC_INVOCATION_TYPE; + + this.bufferFactory = bufferFactory; + try { + currentBuffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void output(Collection> records){ + // Don't acquire the lock if there's no work to be done + if (records.isEmpty() && currentBuffer.getEventCount() == 0) { + return; + } + List failedEvents = new ArrayList<>(); + Exception sampleException = null; + reentrantLock.lock(); + try { + for (Record record : records) { + final Event event = record.getData(); + try { + if (currentBuffer.getEventCount() == 0) { + codec.start(currentBuffer.getOutputStream(), event, codecContext); + } + codec.writeEvent(event, currentBuffer.getOutputStream()); + int count = currentBuffer.getEventCount() + 1; + currentBuffer.setEventCount(count); + + bufferedEventHandles.add(event.getEventHandle()); + } catch (Exception ex) { + if(sampleException == null) { + sampleException = ex; + } + failedEvents.add(event); + } + + flushToLambdaIfNeeded(); + } + } finally { + reentrantLock.unlock(); + } + + if(!failedEvents.isEmpty()) { + failedEvents + .stream() + .map(Event::getEventHandle) + .forEach(eventHandle -> eventHandle.release(false)); + LOG.error("Unable to add {} events to buffer. Dropping these events. Sample exception provided.", failedEvents.size(), sampleException); + } + } + + private void releaseEventHandles(final boolean result) { + for (EventHandle eventHandle : bufferedEventHandles) { + eventHandle.release(result); + } + bufferedEventHandles.clear(); + } + + private void flushToLambdaIfNeeded() { + LOG.trace("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", + currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); + final AtomicReference errorMsgObj = new AtomicReference<>(); + + try { + if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration, isBatchEnabled)) { + try { + codec.complete(currentBuffer.getOutputStream()); + LOG.info("Writing {} to Lambda with {} events and size of {} bytes.", + functionName, currentBuffer.getEventCount(), currentBuffer.getSize()); + final boolean isFlushToLambda = retryFlushToLambda(currentBuffer, errorMsgObj); + + if (isFlushToLambda) { + LOG.info("Successfully flushed to Lambda {}.", functionName); + numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); + } else { + LOG.error("Failed to save to Lambda {}", functionName); + numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); + SdkBytes payload = currentBuffer.getPayload(); + dlqPushHandler.perform(pluginSetting,new LambdaSinkFailedDlqData(payload,errorMsgObj.get(),0)); + } + //release even if failed + releaseEventHandles(true); + //reset buffer after flush + currentBuffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + } catch (final IOException e) { + releaseEventHandles(false); + LOG.error("Exception while completing codec", e); + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + protected boolean retryFlushToLambda(Buffer currentBuffer, + final AtomicReference errorMsgObj) throws InterruptedException { + boolean isUploadedToLambda = Boolean.FALSE; + int retryCount = maxRetries; + do { + + try { + currentBuffer.flushToLambda(); + isUploadedToLambda = Boolean.TRUE; + } catch (AwsServiceException | SdkClientException e) { + errorMsgObj.set(e.getMessage()); + LOG.error("Exception occurred while uploading records to lambda. Retry countdown : {} | exception:", + retryCount, e); + --retryCount; + if (retryCount == 0) { + return isUploadedToLambda; + } + Thread.sleep(5000); + } + } while (!isUploadedToLambda); + return isUploadedToLambda; + } +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java new file mode 100644 index 0000000000..74aa98e7f9 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; + +import java.time.Duration; + +/** + * Check threshold limits. + */ +public class ThresholdCheck { + + private ThresholdCheck() { + } + + public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final Duration maxCollectionDuration, final Boolean isBatchEnabled) { + if (!isBatchEnabled) return true; + + if (maxEvents > 0) { + return currentBuffer.getEventCount() + 1 > maxEvents || + currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0 || + currentBuffer.getSize() > maxBytes.getBytes(); + } else { + return currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0 || + currentBuffer.getSize() > maxBytes.getBytes(); + } + } +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java new file mode 100644 index 0000000000..48afbe6a01 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; + +import software.amazon.awssdk.core.SdkBytes; + +import java.io.OutputStream; +import java.time.Duration; + +/** + * A buffer can hold data before flushing it. + */ +public interface Buffer { + + long getSize(); + + int getEventCount(); + + Duration getDuration(); + + void flushToLambda(); + + OutputStream getOutputStream(); + + SdkBytes getPayload(); + + void setEventCount(int eventCount); +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java new file mode 100644 index 0000000000..80afd2f1ca --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; + +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.io.IOException; + +public interface BufferFactory { + Buffer getBuffer(LambdaClient lambdaClient, String functionName, String invocationType) throws IOException; +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java new file mode 100644 index 0000000000..bba70c6e62 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.time.StopWatch; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * A buffer can hold in memory data and flushing it. + */ +public class InMemoryBuffer implements Buffer { + + private static final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + private final LambdaClient lambdaClient; + private final String functionName; + private final String invocationType; + private int eventCount; + private final StopWatch watch; + private boolean isCodecStarted; + + + public InMemoryBuffer(LambdaClient lambdaClient, String functionName, String invocationType) { + this.lambdaClient = lambdaClient; + this.functionName = functionName; + this.invocationType = invocationType; + + byteArrayOutputStream.reset(); + eventCount = 0; + watch = new StopWatch(); + watch.start(); + isCodecStarted = false; + } + + @Override + public long getSize() { + return byteArrayOutputStream.size(); + } + + @Override + public int getEventCount() { + return eventCount; + } + + public Duration getDuration() { + return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS)); + } + + + @Override + public void flushToLambda() { + InvokeResponse resp; + SdkBytes payload = getPayload(); + + // Setup an InvokeRequest. + InvokeRequest request = InvokeRequest.builder() + .functionName(functionName) + .payload(payload) + .invocationType(invocationType) + .build(); + + resp = lambdaClient.invoke(request); + } + + private SdkBytes validatePayload(String payload_string) { + ObjectMapper mapper = new ObjectMapper(); + try { + JsonNode jsonNode = mapper.readTree(byteArrayOutputStream.toByteArray()); + + // Convert the JsonNode back to a String to normalize it (removes extra spaces, trailing commas, etc.) + String normalizedJson = mapper.writeValueAsString(jsonNode); + return SdkBytes.fromUtf8String(normalizedJson); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + @Override + public void setEventCount(int eventCount) { + this.eventCount = eventCount; + } + + @Override + public OutputStream getOutputStream() { + return byteArrayOutputStream; + } + + @Override + public SdkBytes getPayload() { + byte[] bytes = byteArrayOutputStream.toByteArray(); + SdkBytes sdkBytes = SdkBytes.fromByteArray(bytes); + return sdkBytes; + } +} + diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java new file mode 100644 index 0000000000..e58952c5cb --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; + +import software.amazon.awssdk.services.lambda.LambdaClient; + + +public class InMemoryBufferFactory implements BufferFactory { + @Override + public Buffer getBuffer(LambdaClient lambdaClient, String functionName, String invocationType){ + return new InMemoryBuffer(lambdaClient, functionName, invocationType); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodec.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodec.java new file mode 100644 index 0000000000..5bf21f5e18 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodec.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda.codec; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Objects; + +public class LambdaJsonCodec implements OutputCodec { + private final ObjectMapper objectMapper = new ObjectMapper(); + private static final String JSON = "json"; + private static final JsonFactory factory = new JsonFactory(); + private JsonGenerator generator; + private OutputCodecContext codecContext; + private final String keyName; + + public LambdaJsonCodec(final String keyName) { + this.keyName = keyName; + } + + @Override + public String getExtension() { + return JSON; + } + + @Override + public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(codecContext); + this.codecContext = codecContext; + generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); + if(Objects.nonNull(keyName)){ + generator.writeStartObject(); + generator.writeFieldName(keyName); + generator.writeStartArray(); + } + } + + @Override + public void complete(final OutputStream outputStream) throws IOException { + if(!Objects.isNull(keyName)) { + generator.writeEndArray(); + generator.writeEndObject(); + } + + generator.close(); + outputStream.flush(); + outputStream.close(); + } + + @Override + public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException { + Objects.requireNonNull(event); + if(Objects.isNull(keyName)) { + Map eventMap = event.toMap(); + objectMapper.writeValue(outputStream, eventMap); + + }else{ + Map dataMap = event.toMap(); //(event); + objectMapper.writeValue(generator, dataMap); + } + generator.flush(); + } +} + + + + diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java new file mode 100644 index 0000000000..8d6c64829d --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.regions.Region; + +import java.util.Map; + +public class AwsAuthenticationOptions { + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_external_id") + @Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters") + private String awsStsExternalId; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + public String getAwsStsRoleArn() { + return awsStsRoleArn; + } + + public String getAwsStsExternalId() { + return awsStsExternalId; + } + + public Map getAwsStsHeaderOverrides() { + return awsStsHeaderOverrides; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java new file mode 100644 index 0000000000..3773d4e6ed --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + + +public class BatchOptions { + + private static final String DEFAULT_BATCH_KEY = "events"; + + @JsonProperty("batch_key") + private String batchKey = DEFAULT_BATCH_KEY; + + @JsonProperty("threshold") + @NotNull + ThresholdOptions thresholdOptions; + + public String getBatchKey(){return batchKey;} + + public ThresholdOptions getThresholdOptions(){return thresholdOptions;} + +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java new file mode 100644 index 0000000000..031157c4be --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.hibernate.validator.constraints.time.DurationMax; +import org.hibernate.validator.constraints.time.DurationMin; +import org.opensearch.dataprepper.model.types.ByteCount; +import java.time.Duration; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; + + +public class ThresholdOptions { + + private static final String DEFAULT_BYTE_CAPACITY = "6mb"; + + @JsonProperty("event_count") + @Size(min = 0, max = 10000000, message = "event_count size should be between 0 and 10000000") + @NotNull + private int eventCount; + + @JsonProperty("maximum_size") + private String maximumSize = DEFAULT_BYTE_CAPACITY; + + @JsonProperty("event_collect_timeout") + @DurationMin(seconds = 1) + @DurationMax(seconds = 3600) + @NotNull + private Duration eventCollectTimeOut; + + /** + * Read event collection duration configuration. + * @return event collect time out. + */ + public Duration getEventCollectTimeOut() { + return eventCollectTimeOut; + } + + /** + * Read byte capacity configuration. + * @return maximum byte count. + */ + public ByteCount getMaximumSize() { + return ByteCount.parse(maximumSize); + } + + /** + * Read the event count configuration. + * @return event count. + */ + public int getEventCount() { + return eventCount; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java new file mode 100644 index 0000000000..1bdeb0a394 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java @@ -0,0 +1,131 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda.dlq; + +import com.fasterxml.jackson.databind.ObjectWriter; +import io.micrometer.core.instrument.util.StringUtils; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.failures.DlqObject; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.dlq.DlqProvider; +import org.opensearch.dataprepper.plugins.dlq.DlqWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.StringJoiner; + +import static java.util.UUID.randomUUID; + + +/** + * * An Handler class which helps log failed data to AWS S3 bucket or file based on configuration. + */ + +public class DlqPushHandler { + + private static final Logger LOG = LoggerFactory.getLogger(DlqPushHandler.class); + + private static final String BUCKET = "bucket"; + + private static final String ROLE_ARN = "sts_role_arn"; + + private static final String REGION = "region"; + + private static final String S3_PLUGIN_NAME = "s3"; + + private static final String KEY_PATH_PREFIX = "key_path_prefix"; + + private String dlqFile; + + private String keyPathPrefix; + + private DlqProvider dlqProvider; + + private ObjectWriter objectWriter; + + public DlqPushHandler( + final PluginFactory pluginFactory, + final String bucket, + final String stsRoleArn, + final String awsRegion, + final String dlqPathPrefix) { + + this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,dlqPathPrefix); + } + + public void perform(final PluginSetting pluginSetting, + final Object failedData) { + if(dlqFile != null) + writeToFile(failedData); + else + pushToS3(pluginSetting, failedData); + } + + private void writeToFile(Object failedData) { + try(BufferedWriter dlqFileWriter = Files.newBufferedWriter(Paths.get(dlqFile), + StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { + dlqFileWriter.write(objectWriter.writeValueAsString(failedData)+"\n"); + } catch (IOException e) { + LOG.error("Exception while writing failed data to DLQ file Exception: ",e); + } + } + + private void pushToS3(PluginSetting pluginSetting, Object failedData) { + DlqWriter dlqWriter = getDlqWriter(pluginSetting.getPipelineName()); + try { + String pluginId = randomUUID().toString(); + DlqObject dlqObject = DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginSetting.getName()) + .withPipelineName(pluginSetting.getPipelineName()) + .withFailedData(failedData) + .build(); + final List dlqObjects = Arrays.asList(dlqObject); + dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginId); + LOG.info("wrote {} events to DLQ",dlqObjects.size()); + } catch (final IOException e) { + LOG.error("Exception while writing failed data to DLQ, Exception : ", e); + } + } + + private DlqWriter getDlqWriter(final String writerPipelineName) { + Optional potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER) + .add(writerPipelineName).toString()); + DlqWriter dlqWriter = potentialDlq.isPresent() ? potentialDlq.get() : null; + return dlqWriter; + } + + private DlqProvider getDlqProvider(final PluginFactory pluginFactory, + final String bucket, + final String stsRoleArn, + final String awsRegion, + final String dlqPathPrefix) { + final Map props = new HashMap<>(); + props.put(BUCKET, bucket); + props.put(ROLE_ARN, stsRoleArn); + props.put(REGION, awsRegion); + this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix); + props.put(KEY_PATH_PREFIX, dlqPathPrefix); + final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + DlqProvider dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting); + return dlqProvider; + } + + private String enforceDefaultDelimiterOnKeyPathPrefix(final String keyPathPrefix) { + return (keyPathPrefix.charAt(keyPathPrefix.length() - 1) == '/') ? keyPathPrefix : keyPathPrefix.concat("/"); + } +} + diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java new file mode 100644 index 0000000000..0808010e37 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda.dlq; + +import com.fasterxml.jackson.core.JsonProcessingException; +import software.amazon.awssdk.core.SdkBytes; + + +public class LambdaSinkFailedDlqData { + + private SdkBytes payload; + + private String message; + + private int status; + + public LambdaSinkFailedDlqData(SdkBytes payload, String message, int status) throws JsonProcessingException { + this.payload = payload; + this.message = message; + this.status = status; + } + + public SdkBytes getPayload() { + return payload; + } + + public LambdaSinkFailedDlqData setPayload(SdkBytes payload) { + this.payload = payload; + return this; + } + + public String getMessage() { + return message; + } + + public LambdaSinkFailedDlqData setMessage(String message) { + this.message = message; + return this; + } + + public int getStatus() { + return status; + } + + public LambdaSinkFailedDlqData setStatus(int status) { + this.status = status; + return this; + } + + @Override + public String toString() { + + return "failedData\n" + + "payload \"" + payload.asUtf8String() + "\"\n" + + "message \"" + message + "\"\n" + + "status \"" + status + "\n"; + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java new file mode 100644 index 0000000000..ab72ee44b8 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.LambdaClientBuilder; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.UUID; + +@ExtendWith(MockitoExtension.class) +class LambdaClientFactoryTest { + @Mock + private LambdaSinkConfig lambdaSinkConfig; + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + + @Mock + private AwsAuthenticationOptions awsAuthenticationOptions; + + @BeforeEach + void setUp() { + when(lambdaSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + } + + @Test + void createLambdaClient_with_real_LambdaClient() { + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + final LambdaClient lambdaClient = LambdaClientFactory.createLambdaClient(lambdaSinkConfig, awsCredentialsSupplier); + + assertThat(lambdaClient, notNullValue()); + } + + @ParameterizedTest + @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) + void createlambdaClient_provides_correct_inputs(final String regionString) { + final Region region = Region.of(regionString); + final String stsRoleArn = UUID.randomUUID().toString(); + final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn); + when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); + + final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider); + + final LambdaClientBuilder lambdaClientBuilder = mock(LambdaClientBuilder.class); + when(lambdaClientBuilder.region(region)).thenReturn(lambdaClientBuilder); + when(lambdaClientBuilder.credentialsProvider(any())).thenReturn(lambdaClientBuilder); + when(lambdaClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(lambdaClientBuilder); + try(final MockedStatic lambdaClientMockedStatic = mockStatic(LambdaClient.class)) { + lambdaClientMockedStatic.when(LambdaClient::builder) + .thenReturn(lambdaClientBuilder); + LambdaClientFactory.createLambdaClient(lambdaSinkConfig, awsCredentialsSupplier); + } + + final ArgumentCaptor credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class); + verify(lambdaClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture()); + + final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue(); + + assertThat(actualCredentialsProvider, equalTo(expectedCredentialsProvider)); + + final ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); + verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture()); + + final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue(); + assertThat(actualCredentialsOptions.getRegion(), equalTo(region)); + assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); + assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java new file mode 100644 index 0000000000..eda9488a04 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.regions.Region; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + + +class LambdaSinkConfigTest { + public static final int DEFAULT_MAX_RETRIES = 3; + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + void lambda_sink_default_max_connection_retries_test(){ + assertThat(new LambdaSinkConfig().getMaxConnectionRetries(),equalTo(DEFAULT_MAX_RETRIES)); + } + + @Test + void lambda_sink_pipeline_config_test() throws JsonProcessingException { + final String config = + " function_name: test_function\n" + + " aws:\n" + + " region: ap-south-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " max_retries: 10\n" + + " dlq:\n" + + " s3:\n" + + " bucket: test\n" + + " key_path_prefix: test\n" + + " region: ap-south-1\n" + + " sts_role_arn: test-role-arn\n"; + final LambdaSinkConfig lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + assertThat(lambdaSinkConfig.getMaxConnectionRetries(),equalTo(10)); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion(),equalTo(Region.AP_SOUTH_1)); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(),equalTo("arn:aws:iam::524239988912:role/app-test")); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsHeaderOverrides().get("test"),equalTo("test")); + assertThat(lambdaSinkConfig.getDlqStsRegion(),equalTo("ap-south-1")); + assertThat(lambdaSinkConfig.getDlqStsRoleARN(),equalTo("test-role-arn")); + } + + @Test + void lambda_sink_pipeline_config_test_with_no_dlq() throws JsonProcessingException { + final String config = + " function_name: test_function\n" + + " aws:\n" + + " region: ap-south-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " max_retries: 10\n"; + final LambdaSinkConfig lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + assertThat(lambdaSinkConfig.getMaxConnectionRetries(),equalTo(10)); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion(),equalTo(Region.AP_SOUTH_1)); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(),equalTo("arn:aws:iam::524239988912:role/app-test")); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsHeaderOverrides().get("test"),equalTo("test")); + assertThat(lambdaSinkConfig.getDlqStsRegion(),equalTo("ap-south-1")); + assertThat(lambdaSinkConfig.getDlqStsRoleARN(),equalTo("arn:aws:iam::524239988912:role/app-test")); + assertThat(lambdaSinkConfig.getDlqPluginSetting().get("key"),equalTo(null)); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java new file mode 100644 index 0000000000..bbab8778c0 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java @@ -0,0 +1,326 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBuffer; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.config.ThresholdOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.LambdaSinkFailedDlqData; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; + + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +public class LambdaSinkServiceTest { + + public static final int maxEvents = 10; + public static final int maxRetries = 3; + public static final String region = "us-east-1"; + public static final String maxSize = "1kb"; + public static final String functionName = "testFunction"; + public static final String invocationType = "event"; + public static final String batchKey ="lambda_batch_key"; + public static final String config = + " function_name: testFunction\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " max_retries: 10\n"; + + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + private LambdaSinkConfig lambdaSinkConfig; + private LambdaClient lambdaClient; + private PluginMetrics pluginMetrics; + private Counter numberOfRecordsSuccessCounter; + private Counter numberOfRecordsFailedCounter; + private DlqPushHandler dlqPushHandler; + private Buffer buffer; + private BufferFactory bufferFactory; + + + private InvokeResponse invokeResponse; + + private SdkHttpResponse sdkHttpResponse; + + InvokeResponse mockResponse; + + @BeforeEach + public void setup() throws IOException { + this.lambdaClient = mock(LambdaClient.class); + this.pluginMetrics = mock(PluginMetrics.class); + this.buffer = mock(InMemoryBuffer.class); + this.lambdaSinkConfig = mock(LambdaSinkConfig.class); + this.numberOfRecordsSuccessCounter = mock(Counter.class); + this.numberOfRecordsFailedCounter = mock(Counter.class); + this.dlqPushHandler = mock(DlqPushHandler.class); + this.bufferFactory = mock(BufferFactory.class); + when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)).thenReturn(numberOfRecordsSuccessCounter); + when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).thenReturn(numberOfRecordsFailedCounter); + mockResponse = InvokeResponse.builder() + .statusCode(200) // HTTP 200 for successful invocation + .payload(SdkBytes.fromString("{\"key\": \"value\"}", java.nio.charset.StandardCharsets.UTF_8)) + .build(); + } + + private LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig) throws IOException { + bufferFactory = new InMemoryBufferFactory(); + buffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + return new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + mock(PluginFactory.class), + mock(PluginSetting.class), + mock(OutputCodecContext.class), + mock(AwsCredentialsSupplier.class), + dlqPushHandler, + bufferFactory); + } + + private LambdaSinkService createObjectUnderTest(String config) throws IOException { + this.lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + bufferFactory = new InMemoryBufferFactory(); + buffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + return new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + mock(PluginFactory.class), + mock(PluginSetting.class), + mock(OutputCodecContext.class), + mock(AwsCredentialsSupplier.class), + dlqPushHandler, + bufferFactory); + } + + @Test + public void lambda_sink_test_with_empty_payload_records() throws IOException { + numberOfRecordsSuccessCounter = mock(Counter.class); + LambdaSinkService lambdaSinkService = createObjectUnderTest(config); + lambdaSinkService.output(List.of()); + verifyNoInteractions(lambdaClient); + verifyNoInteractions(numberOfRecordsSuccessCounter); + verifyNoInteractions(numberOfRecordsFailedCounter); + } + + + @Test + public void lambda_sink_test_with_single_record_success_push_to_lambda() throws IOException { + LambdaSinkService lambdaSinkService = createObjectUnderTest(config); + + Map map = new HashMap<>(); + map.put("query1","test1"); + map.put("query2","test2"); + + final Record eventRecord = new Record<>(JacksonEvent.builder().withData(map).withEventType("event").build()); + Collection> records = List.of(eventRecord); + final ArgumentCaptor invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class); + + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + lambdaSinkService.output(records); + + verify(lambdaClient).invoke(invokeRequestCaptor.capture()); + final InvokeRequest actualRequest = invokeRequestCaptor.getValue(); + assertEquals(actualRequest.functionName(), "testFunction"); + assertEquals(actualRequest.invocationType().toString(), "Event"); + verify(numberOfRecordsSuccessCounter).increment(records.size()); + } + + @Test + public void lambda_sink_test_max_retires_works() throws IOException { + final String config = + " function_name: test_function\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " max_retries: 3\n"; + this.buffer = mock(InMemoryBuffer.class); + when(lambdaClient.invoke(any(InvokeRequest.class))).thenThrow(AwsServiceException.class); + doNothing().when(dlqPushHandler).perform(any(PluginSetting.class), any(LambdaSinkFailedDlqData.class)); + + this.lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + bufferFactory = mock(BufferFactory.class); + buffer = mock(Buffer.class); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + when(buffer.getOutputStream()).thenReturn(byteArrayOutputStream); + when(bufferFactory.getBuffer(any(LambdaClient.class),any(),any())).thenReturn(buffer); + doThrow(AwsServiceException.class).when(buffer).flushToLambda(); + + LambdaSinkService lambdaSinkService = new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + mock(PluginFactory.class), + mock(PluginSetting.class), + mock(OutputCodecContext.class), + mock(AwsCredentialsSupplier.class), + dlqPushHandler, + bufferFactory); + + final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + Collection> records = List.of(eventRecord); + lambdaSinkService.output(records); + + verify(buffer, times(3)).flushToLambda(); + } + + @Test + public void lambda_sink_test_dlq_works() throws IOException { + final String config = + " function_name: test_function\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " max_retries: 3\n"; + + when(lambdaClient.invoke(any(InvokeRequest.class))).thenThrow(AwsServiceException.class); + doNothing().when(dlqPushHandler).perform(any(PluginSetting.class), any(LambdaSinkFailedDlqData.class)); + + this.lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + bufferFactory = mock(BufferFactory.class); + buffer = mock(Buffer.class); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + when(buffer.getOutputStream()).thenReturn(byteArrayOutputStream); + when(bufferFactory.getBuffer(any(LambdaClient.class),any(),any())).thenReturn(buffer); + + doThrow(AwsServiceException.class).when(buffer).flushToLambda(); + + LambdaSinkService lambdaSinkService = new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + mock(PluginFactory.class), + mock(PluginSetting.class), + mock(OutputCodecContext.class), + mock(AwsCredentialsSupplier.class), + dlqPushHandler, + bufferFactory); + + final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + Collection> records = List.of(eventRecord); + + lambdaSinkService.output(records); + + verify(buffer, times(3)).flushToLambda(); + verify(dlqPushHandler,times(1)).perform(any(PluginSetting.class),any(Object.class)); + } + + @Test + public void lambda_sink_test_with_multiple_record_success_push_to_lambda() throws IOException { + LambdaSinkService lambdaSinkService = createObjectUnderTest(config); + final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + Collection> records = new ArrayList<>(); + int totalRecords = 11; + for(int recordSize = 0; recordSize < totalRecords ; recordSize++) { + records.add(eventRecord); + } + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + + lambdaSinkService.output(records); + + verify(lambdaClient,times(totalRecords)).invoke(any(InvokeRequest.class)); + + } + + @Test + void lambda_sink_service_test_output_with_single_record_ack_release() throws IOException { + final LambdaSinkService lambdaSinkService = createObjectUnderTest(config); + final Event event = mock(Event.class); + given(event.toJsonString()).willReturn("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}"); + given(event.getEventHandle()).willReturn(mock(EventHandle.class)); + + final ArgumentCaptor invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class); + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + + lambdaSinkService.output(List.of(new Record<>(event))); + + verify(lambdaClient,times(1)).invoke(invokeRequestCaptor.capture()); + final InvokeRequest actualRequest = invokeRequestCaptor.getValue(); + assertThat(actualRequest.functionName(), equalTo("testFunction")); + assertThat(actualRequest.invocationType().toString(), equalTo("Event")); + verify(numberOfRecordsSuccessCounter).increment(1); + } + + @Test + public void lambda_sink_test_batch_enabled() throws IOException { + when(lambdaSinkConfig.getFunctionName()).thenReturn(functionName); + when(lambdaSinkConfig.getMaxConnectionRetries()).thenReturn(maxRetries); + when(lambdaSinkConfig.getBatchOptions()).thenReturn(mock(BatchOptions.class)); + when(lambdaSinkConfig.getBatchOptions().getBatchKey()).thenReturn(batchKey); + when(lambdaSinkConfig.getBatchOptions().getThresholdOptions()).thenReturn(mock(ThresholdOptions.class)); + when(lambdaSinkConfig.getBatchOptions().getThresholdOptions().getEventCount()).thenReturn(maxEvents); + when(lambdaSinkConfig.getBatchOptions().getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse(maxSize)); + when(lambdaSinkConfig.getBatchOptions().getThresholdOptions().getEventCollectTimeOut()).thenReturn(Duration.ofNanos(10L)); + when(lambdaSinkConfig.getAwsAuthenticationOptions()).thenReturn(mock(AwsAuthenticationOptions.class)); + + LambdaSinkService lambdaSinkService = createObjectUnderTest(lambdaSinkConfig); + + Map map = new HashMap<>(); + map.put("query1","test1"); + map.put("query2","test2"); + + String expected_payload = "{\"lambda_batch_key\":[{\"query1\":\"test1\",\"query2\":\"test2\"}]}"; + final Record eventRecord = new Record<>(JacksonEvent.builder().withData(map).withEventType("event").build()); + Collection> records = List.of(eventRecord); + final ArgumentCaptor invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class); + + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + lambdaSinkService.output(records); + + verify(lambdaClient).invoke(invokeRequestCaptor.capture()); + final InvokeRequest actualRequest = invokeRequestCaptor.getValue(); + assertEquals(actualRequest.functionName(), functionName); + assertEquals(actualRequest.invocationType().toString(), "Event"); + String actualRequestPayload = actualRequest.payload().asUtf8String(); + assertEquals(actualRequestPayload, expected_payload ); + verify(numberOfRecordsSuccessCounter).increment(records.size()); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java new file mode 100644 index 0000000000..1687cbd285 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class LambdaSinkTest { + + public static final String S3_REGION = "us-east-1"; + public static final String CODEC_PLUGIN_NAME = "json"; + public static final String SINK_PLUGIN_NAME = "lambda"; + public static final String SINK_PIPELINE_NAME = "lambda-sink-pipeline"; + private LambdaSinkConfig lambdaSinkConfig; + private LambdaSink lambdaSink; + private PluginSetting pluginSetting; + private PluginFactory pluginFactory; + private AwsCredentialsSupplier awsCredentialsSupplier; + private SinkContext sinkContext; + + @BeforeEach + void setUp() { + lambdaSinkConfig = mock(LambdaSinkConfig.class); + sinkContext = mock(SinkContext.class); + AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + pluginSetting = mock(PluginSetting.class); + PluginModel pluginModel = mock(PluginModel.class); + pluginFactory = mock(PluginFactory.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); + Map dlqMap = mock(HashMap.class); + LambdaClient lambdaClient = mock(LambdaClient.class); + + + when(lambdaSinkConfig.getDlq()).thenReturn(pluginModel); + when(pluginModel.getPluginSettings()).thenReturn(dlqMap); + when(lambdaSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION)); + when(pluginModel.getPluginName()).thenReturn(CODEC_PLUGIN_NAME); + when(pluginSetting.getName()).thenReturn(SINK_PLUGIN_NAME); + when(pluginSetting.getPipelineName()).thenReturn(SINK_PIPELINE_NAME); + } + + private LambdaSink createObjectUnderTest() { + return new LambdaSink(pluginSetting, lambdaSinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier); + } + + @Test + void test_lambda_sink_plugin_isReady_positive() { + lambdaSink = createObjectUnderTest(); + Assertions.assertNotNull(lambdaSink); + Assertions.assertNotNull(lambdaSinkConfig); + lambdaSink.doInitialize(); + assertTrue(lambdaSink.isReady(), "lambda sink is not initialized and not ready to work"); + } + + @Test + void test_lambda_sink_plugin_isReady_negative() { + lambdaSink = createObjectUnderTest(); + Assertions.assertNotNull(lambdaSink); + assertFalse(lambdaSink.isReady(), "lambda sink is initialized and ready to work"); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java new file mode 100644 index 0000000000..b63553911a --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java @@ -0,0 +1,126 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; + +import java.io.IOException; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ThresholdCheckTest { + + @Mock(lenient = true) + private Buffer buffer; + private int maxEvents; + private ByteCount maxBytes; + private Duration maxCollectionDuration; + private Boolean isBatchEnabled; + + @BeforeEach + void setUp() { + maxEvents = 10_000; + maxBytes = ByteCount.parse("48mb"); + maxCollectionDuration = Duration.ofMinutes(5); + isBatchEnabled = true; + } + + @Test + void test_exceedThreshold_true_dueTo_maxEvents_is_greater_than_buffered_event_count() throws IOException { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents + 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertTrue(isThresholdExceed, "Threshold exceeded"); + } + + @Test + void test_exceedThreshold_false_dueTo_maxEvents_is_less_than_buffered_event_count() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(this.maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertFalse(isThresholdExceed, "Threshold exceeded"); + } + + @Test + void test_exceedThreshold_true_dueTo_maxBytes_is_greater_than_buffered_byte_count() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() + 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertTrue(isThresholdExceed, "Threshold not exceeded"); + } + + @Test + void test_exceedThreshold_false_dueTo_maxBytes_is_less_than_buffered_byte_count() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertFalse(isThresholdExceed, "Threshold exceeded"); + } + + @Test + void test_exceedThreshold_true_dueTo_maxCollectionDuration_is_greater_than_buffered_event_collection_duration() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.plusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertTrue(isThresholdExceed, "Threshold not exceeded"); + } + + @Test + void test_exceedThreshold_false_dueTo_maxCollectionDuration_is_less_than_buffered_event_collection_duration() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + + assertFalse(isThresholdExceed, "Threshold exceeded"); + } + + @Test + void test_exceedThreshold_when_batch_is_enabled() throws IOException { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents + 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + Boolean isBatchEnabled = false; + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertTrue(isThresholdExceed); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java new file mode 100644 index 0000000000..d161b28bb0 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumulator; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +class InMemoryBufferFactoryTest { + + @Test + void test_inMemoryBufferFactory_notNull(){ + InMemoryBufferFactory inMemoryBufferFactory = new InMemoryBufferFactory(); + Assertions.assertNotNull(inMemoryBufferFactory); + } + + @Test + void test_buffer_notNull(){ + InMemoryBufferFactory inMemoryBufferFactory = new InMemoryBufferFactory(); + Assertions.assertNotNull(inMemoryBufferFactory); + Buffer buffer = inMemoryBufferFactory.getBuffer(null, null, null); + Assertions.assertNotNull(buffer); + assertThat(buffer, instanceOf(Buffer.class)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java new file mode 100644 index 0000000000..478650a300 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java @@ -0,0 +1,167 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumulator; + +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBuffer; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; + +import java.io.IOException; +import java.io.OutputStream; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class InMemoryBufferTest { + + public static final int MAX_EVENTS = 55; + @Mock + private LambdaClient lambdaClient; + + private final String functionName = "testFunction"; + + private final String invocationType = "Event"; + + private InMemoryBuffer inMemoryBuffer; + + @Test + void test_with_write_event_into_buffer() throws IOException { + inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); + + while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { + OutputStream outputStream = inMemoryBuffer.getOutputStream(); + outputStream.write(generateByteArray()); + int eventCount = inMemoryBuffer.getEventCount() +1; + inMemoryBuffer.setEventCount(eventCount); + } + assertThat(inMemoryBuffer.getSize(), greaterThanOrEqualTo(54110L)); + assertThat(inMemoryBuffer.getEventCount(), equalTo(MAX_EVENTS)); + assertThat(inMemoryBuffer.getDuration(), notNullValue()); + assertThat(inMemoryBuffer.getDuration(), greaterThanOrEqualTo(Duration.ZERO)); + + } + + @Test + @Disabled("unstable") + /** + * There are 5 checkpoints in the tests as below + * |-----------upperBoundDuration-------------| + * startTime --- stopWatchStart --- endTime --- checkpoint --- stopwatchGetDuration + * |-lowerBoundDuration-| + * |------------inMemoryBuffer.Duration-------------| + * This test assumes the startTime and stopWatchStart are same, checkpoint and stopwatchGetDuration are same. + * However, they are not true at some systems. + */ + void getDuration_provides_duration_within_expected_range() throws IOException, InterruptedException { + Instant startTime = Instant.now(); + inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); + Instant endTime = Instant.now(); + + + while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { + OutputStream outputStream = inMemoryBuffer.getOutputStream(); + outputStream.write(generateByteArray()); + int eventCount = inMemoryBuffer.getEventCount() +1; + inMemoryBuffer.setEventCount(eventCount); + } + Thread.sleep(100); + + Instant durationCheckpointTime = Instant.now(); + Duration duration = inMemoryBuffer.getDuration(); + assertThat(duration, notNullValue()); + + Duration upperBoundDuration = Duration.between(startTime, durationCheckpointTime).truncatedTo(ChronoUnit.MILLIS); + Duration lowerBoundDuration = Duration.between(endTime, durationCheckpointTime).truncatedTo(ChronoUnit.MILLIS); + assertThat(duration, greaterThanOrEqualTo(lowerBoundDuration)); + assertThat(duration, lessThanOrEqualTo(upperBoundDuration)); + } + + @Test + void test_with_write_event_into_buffer_and_flush_toLambda() throws IOException { + + // Mock the response of the invoke method + InvokeResponse mockResponse = InvokeResponse.builder() + .statusCode(200) // HTTP 200 for successful invocation + .payload(SdkBytes.fromString("{\"key\": \"value\"}", java.nio.charset.StandardCharsets.UTF_8)) + .build(); + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + + inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); + while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { + OutputStream outputStream = inMemoryBuffer.getOutputStream(); + outputStream.write(generateByteArray()); + int eventCount = inMemoryBuffer.getEventCount() +1; + inMemoryBuffer.setEventCount(eventCount); + } + assertDoesNotThrow(() -> { + inMemoryBuffer.flushToLambda(); + }); + } + + @Test + void test_uploadedToLambda_success() throws IOException { + // Mock the response of the invoke method + InvokeResponse mockResponse = InvokeResponse.builder() + .statusCode(200) // HTTP 200 for successful invocation + .payload(SdkBytes.fromString("{\"key\": \"value\"}", java.nio.charset.StandardCharsets.UTF_8)) + .build(); + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); + Assertions.assertNotNull(inMemoryBuffer); + OutputStream outputStream = inMemoryBuffer.getOutputStream(); + outputStream.write(generateByteArray()); + assertDoesNotThrow(() -> { + inMemoryBuffer.flushToLambda(); + }); + } + + @Test + void test_uploadedToLambda_fails() { + // Mock the response of the invoke method + InvokeResponse mockResponse = InvokeResponse.builder() + .statusCode(200) // HTTP 200 for successful invocation + .payload(SdkBytes.fromString("{\"key\": \"value\"}", java.nio.charset.StandardCharsets.UTF_8)) + .build(); + SdkClientException sdkClientException = mock(SdkClientException.class); + when(lambdaClient.invoke(any(InvokeRequest.class))) + .thenThrow(sdkClientException); + inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); + + Assertions.assertNotNull(inMemoryBuffer); + SdkClientException actualException = assertThrows(SdkClientException.class, () -> inMemoryBuffer.flushToLambda()); + assertThat(actualException, Matchers.equalTo(sdkClientException)); + } + + private byte[] generateByteArray() { + byte[] bytes = new byte[1000]; + for (int i = 0; i < 1000; i++) { + bytes[i] = (byte) i; + } + return bytes; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodecTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodecTest.java new file mode 100644 index 0000000000..6de6ce8a0e --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/LambdaJsonCodecTest.java @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.codec; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeType; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class LambdaJsonCodecTest { + private ByteArrayOutputStream outputStream; + + private LambdaJsonCodec createObjectUnderTest() { + String key = "event"; + return new LambdaJsonCodec(key); + } + + @Test + void test_happy_case_with_null_codec_key() throws IOException { + LambdaJsonCodec LambdaJsonCodec = new LambdaJsonCodec(null); + + outputStream = new ByteArrayOutputStream(); + OutputCodecContext codecContext = new OutputCodecContext(); + LambdaJsonCodec.start(outputStream, null, codecContext); + + final List> expectedData = generateRecords(1); + final Event event = convertToEvent(expectedData.get(0)); + LambdaJsonCodec.writeEvent(event, outputStream); + LambdaJsonCodec.complete(outputStream); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree(outputStream.toByteArray()); + assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + assertEquals(jsonNode.toString(),"{\"name\":\"Person0\",\"age\":0}"); + } + + + @Test + void test_happy_case_with_codec_key() throws IOException { + String key = "events"; + final int numberOfRecords = 2; + LambdaJsonCodec LambdaJsonCodec = new LambdaJsonCodec(key); + + outputStream = new ByteArrayOutputStream(); + OutputCodecContext codecContext = new OutputCodecContext(); + LambdaJsonCodec.start(outputStream, null, codecContext); + + final List> expectedData = generateRecords(numberOfRecords); + for (int index = 0; index < numberOfRecords; index++) { + final Event event = convertToEvent(expectedData.get(index)); + LambdaJsonCodec.writeEvent(event, outputStream); + } + LambdaJsonCodec.complete(outputStream); + + String expectedString = "{\"events\":[{\"name\":\"Person0\",\"age\":0},{\"name\":\"Person1\",\"age\":1}]}"; + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree(outputStream.toByteArray()); + assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + assertEquals(jsonNode.toString(),expectedString); + } + + private static Event convertToEvent(Map data) { + return JacksonLog.builder().withData(data).build(); + } + + private static List> generateRecords(int numberOfRecords) { + + List> recordList = new ArrayList<>(); + + for (int rows = 0; rows < numberOfRecords; rows++) { + + Map eventData = new HashMap<>(); + + eventData.put("name", "Person" + rows); + eventData.put("age", rows); + recordList.add(eventData); + + } + + return recordList; + } + + + private Object getValue(JsonNode jsonNode) { + if(jsonNode.isTextual()) + return jsonNode.asText(); + + if(jsonNode.isInt()) + return jsonNode.asInt(); + + throw new RuntimeException("Test not setup correctly."); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java new file mode 100644 index 0000000000..53bd0a4edf --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.config; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class ThresholdOptionsTest { + private static final String DEFAULT_BYTE_CAPACITY = "6mb"; + private static final int DEFAULT_EVENT_COUNT = 0; + + @Test + void test_default_byte_capacity_test() { + assertThat(new ThresholdOptions().getMaximumSize().getBytes(), + equalTo(ByteCount.parse(DEFAULT_BYTE_CAPACITY).getBytes())); + } + + @Test + void test_get_event_collection_duration_test() { + assertThat(new ThresholdOptions().getEventCollectTimeOut(), equalTo(null)); + } + + @Test + void test_get_event_count_test() { + assertThat(new ThresholdOptions().getEventCount(), equalTo(DEFAULT_EVENT_COUNT)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java new file mode 100644 index 0000000000..17f39973b7 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda.dlq; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.dlq.DlqProvider; +import org.opensearch.dataprepper.plugins.dlq.DlqWriter; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import software.amazon.awssdk.core.SdkBytes; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class DlqPushHandlerTest { + + private static final String BUCKET = "bucket"; + private static final String BUCKET_VALUE = "test"; + private static final String ROLE = "arn:aws:iam::524239988122:role/app-test"; + + private static final String REGION = "ap-south-1"; + private static final String S3_PLUGIN_NAME = "s3"; + private static final String KEY_PATH_PREFIX = "key_path_prefix"; + + private static final String KEY_PATH_PREFIX_VALUE = "dlq/"; + + private static final String PIPELINE_NAME = "log-pipeline"; + + private static final String DLQ_FILE = "local_dlq_file"; + + private PluginModel pluginModel; + + private DlqPushHandler dlqPushHandler; + private PluginFactory pluginFactory; + + private AwsAuthenticationOptions awsAuthenticationOptions; + + private DlqProvider dlqProvider; + + private DlqWriter dlqWriter; + + + @BeforeEach + public void setUp(){ + this.pluginFactory = mock(PluginFactory.class); + this.pluginModel = mock(PluginModel.class); + this.awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + this.dlqProvider = mock(DlqProvider.class); + this.dlqWriter = mock(DlqWriter.class); + } + + @Test + void perform_for_dlq_s3_success() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SdkBytes payload = SdkBytes.fromUtf8String("{\"name\":\"dataprepper\"}"); + LambdaSinkFailedDlqData failedDlqData = new LambdaSinkFailedDlqData(payload,"message",0); + dlqPushHandler = new DlqPushHandler(pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..23c33feb6d --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1,3 @@ +# To enable mocking of final classes with vanilla Mockito +# https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods +mock-maker-inline diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java index 224d8b0f54..f74685ddd8 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnection.java @@ -13,7 +13,7 @@ import java.util.Objects; public class MongoDBConnection { - private static final String MONGO_CONNECTION_STRING_TEMPLATE = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s&directConnection=%s"; + private static final String MONGO_CONNECTION_STRING_TEMPLATE = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&directConnection=true&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s&directConnection=%s"; public static MongoClient getMongoClient(final MongoDBSourceConfig sourceConfig) { @@ -25,7 +25,7 @@ public static MongoClient getMongoClient(final MongoDBSourceConfig sourceConfig) if (Objects.nonNull(sourceConfig.getTrustStoreFilePath())) { final File truststoreFilePath = new File(sourceConfig.getTrustStoreFilePath()); settingBuilder.applyToSslSettings(builder -> { - builder.enabled(sourceConfig.getTls()); + builder.enabled(sourceConfig.getInsecure()); builder.invalidHostNameAllowed(sourceConfig.getSslInsecureDisableVerification()); builder.context(TrustStoreProvider.createSSLContext(truststoreFilePath.toPath(), sourceConfig.getTrustStorePassword())); @@ -61,11 +61,11 @@ private static String getConnectionString(final MongoDBSourceConfig sourceConfig // Support for only single host final String hostname = sourceConfig.getHost(); final int port = sourceConfig.getPort(); - final String tls = sourceConfig.getTls().toString(); + final String ssl = sourceConfig.getInsecure().toString(); final String invalidHostAllowed = sourceConfig.getSslInsecureDisableVerification().toString(); final String readPreference = sourceConfig.getReadPreference(); final String directionConnection = sourceConfig.getDirectConnection().toString(); return String.format(MONGO_CONNECTION_STRING_TEMPLATE, username, password, hostname, port, - readPreference, tls, invalidHostAllowed, directionConnection); + readPreference, ssl, invalidHostAllowed, directionConnection); } } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java index 37b7ec6716..22350498ad 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/configuration/MongoDBSourceConfig.java @@ -164,4 +164,4 @@ public String getPassword() { } } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java index 30bc721c49..b672660ee7 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/client/MongoDBConnectionTest.java @@ -38,7 +38,7 @@ void setUp() { when(authenticationConfig.getPassword()).thenReturn("aЯ ⾀sd?q=%%l€0£.lo" + UUID.randomUUID()); when(mongoDBSourceConfig.getHost()).thenReturn(UUID.randomUUID().toString()); when(mongoDBSourceConfig.getPort()).thenReturn(getRandomInteger()); - when(mongoDBSourceConfig.getTls()).thenReturn(getRandomBoolean()); + when(mongoDBSourceConfig.getInsecure()).thenReturn(getRandomBoolean()); when(mongoDBSourceConfig.getSslInsecureDisableVerification()).thenReturn(getRandomBoolean()); when(mongoDBSourceConfig.getReadPreference()).thenReturn("secondaryPreferred"); } @@ -85,7 +85,7 @@ public void getMongoClientEmptyHost() { } private Boolean getRandomBoolean() { - return random.nextBoolean(); + return Math.random() < 0.5; } private int getRandomInteger() { diff --git a/settings.gradle b/settings.gradle index 547551d2bf..d313c019c0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -168,6 +168,7 @@ include 'data-prepper-plugins:dissect-processor' include 'data-prepper-plugins:dynamodb-source' include 'data-prepper-plugins:decompress-processor' include 'data-prepper-plugins:split-event-processor' -include 'data-prepper-plugins:http-common' include 'data-prepper-plugins:flatten-processor' include 'data-prepper-plugins:mongodb' +include 'data-prepper-plugins:http-common' +include 'data-prepper-plugins:lambda-sink'