From 97c5affc70bf34df5676d44d229ce87a4f18430d Mon Sep 17 00:00:00 2001 From: srigovs Date: Thu, 22 Feb 2024 13:00:13 -0800 Subject: [PATCH] Add support for lambda sink Signed-off-by: srigovs --- data-prepper-plugins/lambda-sink/build.gradle | 62 +++ .../sink/lambda/LambdaSinkServiceIT.java | 218 +++++++++++ .../sink/lambda/LambdaClientFactory.java | 46 +++ .../plugins/sink/lambda/LambdaSink.java | 105 +++++ .../plugins/sink/lambda/LambdaSinkConfig.java | 89 +++++ .../sink/lambda/LambdaSinkService.java | 237 ++++++++++++ .../plugins/sink/lambda/ThresholdCheck.java | 33 ++ .../sink/lambda/accumlator/Buffer.java | 31 ++ .../sink/lambda/accumlator/BufferFactory.java | 14 + .../lambda/accumlator/InMemoryBuffer.java | 112 ++++++ .../accumlator/InMemoryBufferFactory.java | 16 + .../plugins/sink/lambda/codec/JsonCodec.java | 79 ++++ .../config/AwsAuthenticationOptions.java | 46 +++ .../sink/lambda/config/BatchOptions.java | 27 ++ .../sink/lambda/config/ThresholdOptions.java | 60 +++ .../sink/lambda/dlq/DlqPushHandler.java | 131 +++++++ .../lambda/dlq/LambdaSinkFailedDlqData.java | 60 +++ .../sink/lambda/LambdaClientFactoryTest.java | 98 +++++ .../sink/lambda/LambdaSinkConfigTest.java | 76 ++++ .../sink/lambda/LambdaSinkServiceTest.java | 358 ++++++++++++++++++ .../plugins/sink/lambda/LambdaSinkTest.java | 83 ++++ .../sink/lambda/ThresholdCheckTest.java | 126 ++++++ .../InMemoryBufferFactoryTest.java | 32 ++ .../accumulator/InMemoryBufferTest.java | 167 ++++++++ .../sink/lambda/codec/JsonCodecTest.java | 111 ++++++ .../lambda/config/ThresholdOptionsTest.java | 33 ++ .../sink/lambda/dlq/DlqPushHandlerTest.java | 88 +++++ .../org.mockito.plugins.MockMaker | 3 + settings.gradle | 1 + 29 files changed, 2542 insertions(+) create mode 100644 data-prepper-plugins/lambda-sink/build.gradle create mode 100644 data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/JsonCodec.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java create mode 100644 data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/JsonCodecTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java create mode 100644 data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/data-prepper-plugins/lambda-sink/build.gradle b/data-prepper-plugins/lambda-sink/build.gradle new file mode 100644 index 0000000000..eecb4a6fc1 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/build.gradle @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +dependencies { + implementation project(':data-prepper-api') + implementation project(path: ':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:aws-plugin-api') + implementation project(':data-prepper-plugins:failures-common') + implementation 'io.micrometer:micrometer-core' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'software.amazon.awssdk:lambda:2.17.99' + implementation 'software.amazon.awssdk:sdk-core:2.x.x' + implementation 'software.amazon.awssdk:sts' + implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + implementation'org.json:json' + implementation libs.commons.lang3 + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + testImplementation project(':data-prepper-test-common') + testImplementation project(':data-prepper-plugins:parse-json-processor') +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + + systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties' + systemProperty 'tests.lambda.sink.region', System.getProperty('tests.lambda.sink.region') + systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName') + + filter { + includeTestsMatching '*IT' + } +} diff --git a/data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java b/data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java new file mode 100644 index 0000000000..21bdb7cd72 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceIT.java @@ -0,0 +1,218 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.config.ThresholdOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class LambdaSinkServiceIT { + + private LambdaClient lambdaClient; + private String functionName; + private String lambdaRegion; + private BufferFactory bufferFactory; + @Mock + private LambdaSinkConfig lambdaSinkConfig; + @Mock + private BatchOptions batchOptions; + @Mock + private ThresholdOptions thresholdOptions; + @Mock + private AwsAuthenticationOptions awsAuthenticationOptions; + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock + private PluginMetrics pluginMetrics; + @Mock + private DlqPushHandler dlqPushHandler; + @Mock + private PluginFactory pluginFactory; + @Mock + private PluginSetting pluginSetting; + @Mock + private Counter numberOfRecordsSuccessCounter; + @Mock + private Counter numberOfRecordsFailedCounter; + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + private String stsRoleArn; + + final String LAMBDA_SINK_CONFIG_YAML = + " function_name: lambda_test_function\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: arn:aws:iam::176893235612:role/osis-s3-opensearch-role\n" + + " sync: true\n" + + " max_retries: 3\n"; + + + @BeforeEach + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this); + lambdaRegion = System.getProperty("tests.lambda.sink.region"); + functionName = System.getProperty("tests.lambda.sink.functionName"); + + final Region region = Region.of(lambdaRegion); + + lambdaClient = LambdaClient.builder() + .region(Region.of(lambdaRegion)) + .build(); + + bufferFactory = new InMemoryBufferFactory(); + + when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)). + thenReturn(numberOfRecordsSuccessCounter); + when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)). + thenReturn(numberOfRecordsFailedCounter); + } + + + private static Record createRecord() { + final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build(); + return new Record<>(event); + } + + public LambdaSinkService createObjectUnderTest(final String config) throws JsonProcessingException { + + final LambdaSinkConfig lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList()); + pluginFactory = null; + return new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + pluginFactory, + pluginSetting, + codecContext, + awsCredentialsSupplier, + dlqPushHandler, + bufferFactory); + } + + public LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig) throws JsonProcessingException { + + OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList()); + pluginFactory = null; + return new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + pluginFactory, + pluginSetting, + codecContext, + awsCredentialsSupplier, + dlqPushHandler, + bufferFactory); + } + + + private static Collection> generateRecords(int numberOfRecords) { + List> recordList = new ArrayList<>(); + + for (int rows = 0; rows < numberOfRecords; rows++) { + HashMap eventData = new HashMap<>(); + eventData.put("name", "Person" + rows); + eventData.put("age", Integer.toString(rows)); + + Record eventRecord = new Record<>(JacksonEvent.builder().withData(eventData).withEventType("event").build()); + recordList.add(eventRecord); + } + return recordList; + } + + @ParameterizedTest + @ValueSource(ints = {1,5}) + void verify_flushed_records_to_lambda_success(final int recordCount) throws Exception { + LambdaSinkService objectUnderTest = createObjectUnderTest(LAMBDA_SINK_CONFIG_YAML); + + Collection> recordsData = generateRecords(recordCount); + objectUnderTest.output(recordsData); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + + verify(numberOfRecordsSuccessCounter, times(recordCount)).increment(1); + } + + @ParameterizedTest + @ValueSource(ints = {1,5,10}) + void verify_flushed_records_to_lambda_failed_and_dlq_works(final int recordCount) throws Exception { + final String LAMBDA_SINK_CONFIG_INVALID_FUNCTION_NAME = + " function_name: $$$\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: arn:aws:iam::176893235612:role/osis-s3-opensearch-role\n" + + " sync: true\n" + + " max_retries: 3\n" + + " dlq: #any failed even\n"+ + " s3:\n"+ + " bucket: test-bucket\n"+ + " key_path_prefix: dlq/\n"; + LambdaSinkService objectUnderTest = createObjectUnderTest(LAMBDA_SINK_CONFIG_INVALID_FUNCTION_NAME); + + Collection> recordsData = generateRecords(recordCount); + objectUnderTest.output(recordsData); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + + verify( numberOfRecordsFailedCounter, times(recordCount)).increment(1); + } + + @ParameterizedTest + @ValueSource(ints = {2,5}) + void verify_flushed_records_with_batching_to_lambda(final int recordCount) throws JsonProcessingException, InterruptedException { + + int event_count = 2; + when(lambdaSinkConfig.getFunctionName()).thenReturn("lambda_test_function"); + when(lambdaSinkConfig.getMaxConnectionRetries()).thenReturn(3); + when(lambdaSinkConfig.getSync()).thenReturn(true); + when(thresholdOptions.getEventCount()).thenReturn(event_count); + when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb")); + when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s")); + when(batchOptions.getBatchKey()).thenReturn("lambda_batch_key"); + when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions); + when(lambdaSinkConfig.getBatchOptions()).thenReturn(batchOptions); + + LambdaSinkService objectUnderTest = createObjectUnderTest(lambdaSinkConfig); + Collection> recordsData = generateRecords(recordCount); + objectUnderTest.output(recordsData); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java new file mode 100644 index 0000000000..3e33a4e835 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.services.lambda.LambdaClient; + +public final class LambdaClientFactory { + private LambdaClientFactory() { } + + static LambdaClient createLambdaClient(final LambdaSinkConfig lambdaSinkConfig, + final AwsCredentialsSupplier awsCredentialsSupplier) { + final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(lambdaSinkConfig.getAwsAuthenticationOptions()); + final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); + + return LambdaClient.builder() + .region(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(awsCredentialsProvider) + .overrideConfiguration(createOverrideConfiguration(lambdaSinkConfig)).build(); + + } + + private static ClientOverrideConfiguration createOverrideConfiguration(final LambdaSinkConfig lambdaSinkConfig) { + final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(lambdaSinkConfig.getMaxConnectionRetries()).build(); + return ClientOverrideConfiguration.builder() + .retryPolicy(retryPolicy) + .build(); + } + + private static AwsCredentialsOptions convertToCredentialsOptions(final AwsAuthenticationOptions awsAuthenticationOptions) { + return AwsCredentialsOptions.builder() + .withRegion(awsAuthenticationOptions.getAwsRegion()) + .withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn()) + .withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId()) + .withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides()) + .build(); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java new file mode 100644 index 0000000000..f099da21cf --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSink.java @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; +import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.util.Collection; + +@DataPrepperPlugin(name = "lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class) +public class LambdaSink extends AbstractSink> { + + private static final Logger LOG = LoggerFactory.getLogger(LambdaSink.class); + private final DlqPushHandler dlqPushHandler; + private volatile boolean sinkInitialized; + private final LambdaSinkService lambdaSinkService; + private final BufferFactory bufferFactory; + private static final String BUCKET = "bucket"; + private static final String KEY_PATH = "key_path_prefix"; + + @DataPrepperPluginConstructor + public LambdaSink(final PluginSetting pluginSetting, + final LambdaSinkConfig lambdaSinkConfig, + final PluginFactory pluginFactory, + final SinkContext sinkContext, + final AwsCredentialsSupplier awsCredentialsSupplier + ) { + super(pluginSetting); + sinkInitialized = Boolean.FALSE; + OutputCodecContext outputCodecContext = OutputCodecContext.fromSinkContext(sinkContext); + LambdaClient lambdaClient = LambdaClientFactory.createLambdaClient(lambdaSinkConfig, awsCredentialsSupplier); + this.dlqPushHandler = new DlqPushHandler(pluginFactory, + String.valueOf(lambdaSinkConfig.getDlqPluginSetting().get(BUCKET)), + lambdaSinkConfig.getDlqStsRoleARN() + , lambdaSinkConfig.getDlqStsRegion(), + String.valueOf(lambdaSinkConfig.getDlqPluginSetting().get(KEY_PATH))); + this.bufferFactory = new InMemoryBufferFactory(); + + lambdaSinkService = new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + pluginFactory, + pluginSetting, + outputCodecContext, + awsCredentialsSupplier, + dlqPushHandler, + bufferFactory); + + } + + @Override + public boolean isReady() { + return sinkInitialized; + } + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Invalid plugin configuration, Hence failed to initialize s3-sink plugin."); + this.shutdown(); + throw e; + } catch (Exception e) { + LOG.error("Failed to initialize lambda plugin."); + this.shutdown(); + throw e; + } + } + + private void doInitializeInternal() { + sinkInitialized = Boolean.TRUE; + } + + /** + * @param records Records to be output + */ + @Override + public void doOutput(final Collection> records) { + + if (records.isEmpty()) { + return; + } + lambdaSinkService.output(records); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java new file mode 100644 index 0000000000..b2169a2081 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfig.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; + +import java.util.Objects; +import java.util.Map; + +public class LambdaSinkConfig { + + private static final int DEFAULT_CONNECTION_RETRIES = 3; + + private static final Boolean DEFAULT_INVOCATION = false; + + public static final String STS_REGION = "region"; + + public static final String STS_ROLE_ARN = "sts_role_arn"; + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("function_name") + @NotEmpty + @NotNull + @Size(min = 3, max = 500, message = "function name length should be at least 3 characters") + private String functionName; + + @JsonProperty("max_retries") + private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; + + @JsonProperty("sync") + private Boolean sync = DEFAULT_INVOCATION; + + @JsonProperty("dlq") + private PluginModel dlq; + + @JsonProperty("batch") + private BatchOptions batchOptions; + + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + public BatchOptions getBatchOptions(){return batchOptions;} + + public String getFunctionName() { + return functionName; + } + + public int getMaxConnectionRetries() { + return maxConnectionRetries; + } + + public Boolean getSync(){ + return sync; + } + + public PluginModel getDlq() { + return dlq; + } + + public String getDlqStsRoleARN(){ + return Objects.nonNull(getDlqPluginSetting().get(STS_ROLE_ARN)) ? + String.valueOf(getDlqPluginSetting().get(STS_ROLE_ARN)) : + awsAuthenticationOptions.getAwsStsRoleArn(); + } + + public String getDlqStsRegion(){ + return Objects.nonNull(getDlqPluginSetting().get(STS_REGION)) ? + String.valueOf(getDlqPluginSetting().get(STS_REGION)) : + awsAuthenticationOptions.getAwsRegion().toString(); + } + + public Map getDlqPluginSetting(){ + return dlq != null ? dlq.getPluginSettings() : Map.of(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java new file mode 100644 index 0000000000..d8805cace0 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkService.java @@ -0,0 +1,237 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.sink.lambda.codec.JsonCodec; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.LambdaSinkFailedDlqData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LambdaSinkService { + + private static final Logger LOG = LoggerFactory.getLogger(LambdaSinkService.class); + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaSinkObjectsEventsSucceeded"; + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaSinkObjectsEventsFailed"; + private final PluginSetting pluginSetting; + private final Lock reentrantLock; + private final LambdaSinkConfig lambdaSinkConfig; + private LambdaClient lambdaClient; + private final String functionName; + private static Boolean isSync; + private int maxEvents = 0; + private ByteCount maxBytes = null; + private Duration maxCollectionDuration = null; + private int maxRetries = 0; + private final Counter numberOfRecordsSuccessCounter; + private final Counter numberOfRecordsFailedCounter; + private final String SYNC_INVOCATION_TYPE = "RequestResponse"; + private final String ASYNC_INVOCATION_TYPE = "Event"; + private final String invocationType; + private Buffer currentBuffer; + private final BufferFactory bufferFactory; + private final DlqPushHandler dlqPushHandler; + private final Collection bufferedEventHandles; + private final List events; + private OutputCodec codec = null; + private final BatchOptions batchOptions; + private Boolean isBatchEnabled; + private OutputCodecContext codecContext = null; + private String batchKey; + + public LambdaSinkService(final LambdaClient lambdaClient, + final LambdaSinkConfig lambdaSinkConfig, + final PluginMetrics pluginMetrics, + final PluginFactory pluginFactory, + final PluginSetting pluginSetting, + final OutputCodecContext codecContext, + final AwsCredentialsSupplier awsCredentialsSupplier, + final DlqPushHandler dlqPushHandler, + final BufferFactory bufferFactory) { + this.lambdaSinkConfig = lambdaSinkConfig; + this.pluginSetting = pluginSetting; + this.dlqPushHandler = dlqPushHandler; + this.lambdaClient = lambdaClient; + reentrantLock = new ReentrantLock(); + numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS); + numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED); + functionName = lambdaSinkConfig.getFunctionName(); + + maxRetries = lambdaSinkConfig.getMaxConnectionRetries(); + batchOptions = lambdaSinkConfig.getBatchOptions(); + + if (!Objects.isNull(batchOptions)){ + maxEvents = batchOptions.getThresholdOptions().getEventCount(); + maxBytes = batchOptions.getThresholdOptions().getMaximumSize(); + maxCollectionDuration = batchOptions.getThresholdOptions().getEventCollectTimeOut(); + batchKey = batchOptions.getBatchKey(); + isBatchEnabled = true; + }else{ + batchKey = null; + isBatchEnabled = false; + } + Map map = new HashMap<>(); + this.codecContext = codecContext; + + codec = new JsonCodec(batchKey); + bufferedEventHandles = new LinkedList<>(); + events = new ArrayList(); + + isSync = lambdaSinkConfig.getSync(); + if(isSync){ + invocationType = SYNC_INVOCATION_TYPE; + }else{ + invocationType = ASYNC_INVOCATION_TYPE; + } + + this.bufferFactory = bufferFactory; + try { + currentBuffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + public void output(Collection> records){ + // Don't acquire the lock if there's no work to be done + if (records.isEmpty() && currentBuffer.getEventCount() == 0) { + return; + } + List failedEvents = new ArrayList<>(); + Exception sampleException = null; + reentrantLock.lock(); + try { + for (Record record : records) { + final Event event = record.getData(); + try { + if (currentBuffer.getEventCount() == 0) { + codec.start(currentBuffer.getOutputStream(), event, codecContext); + } + codec.writeEvent(event, currentBuffer.getOutputStream()); + int count = currentBuffer.getEventCount() + 1; + currentBuffer.setEventCount(count); + + bufferedEventHandles.add(event.getEventHandle()); + } catch (Exception ex) { + if(sampleException == null) { + sampleException = ex; + } + failedEvents.add(event); + } + + flushToLambdaIfNeeded(); + } + } finally { + reentrantLock.unlock(); + } + + if(!failedEvents.isEmpty()) { + failedEvents + .stream() + .map(Event::getEventHandle) + .forEach(eventHandle -> eventHandle.release(false)); + LOG.error("Unable to add {} events to buffer. Dropping these events. Sample exception provided.", failedEvents.size(), sampleException); + } + } + + private void releaseEventHandles(final boolean result) { + for (EventHandle eventHandle : bufferedEventHandles) { + eventHandle.release(result); + } + bufferedEventHandles.clear(); + } + + private void flushToLambdaIfNeeded() { + LOG.trace("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", + currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); + final AtomicReference errorMsgObj = new AtomicReference<>(); + + try { + if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration, isBatchEnabled)) { + try { + codec.complete(currentBuffer.getOutputStream()); + LOG.info("Writing {} to Lambda with {} events and size of {} bytes.", + functionName, currentBuffer.getEventCount(), currentBuffer.getSize()); + final boolean isFlushToLambda = retryFlushToLambda(currentBuffer, errorMsgObj); + + if (isFlushToLambda) { + LOG.info("Successfully flushed to Lambda {}.", functionName); + numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); + } else { + LOG.error("Failed to save to Lambda {}", functionName); + numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); + SdkBytes payload = currentBuffer.getPayload(); + dlqPushHandler.perform(pluginSetting,new LambdaSinkFailedDlqData(payload,errorMsgObj.get(),0)); + } + //release even if failed + releaseEventHandles(true); + //reset buffer after flush + currentBuffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + } catch (final IOException e) { + LOG.error("Exception while completing codec", e); + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + protected boolean retryFlushToLambda(Buffer currentBuffer, + final AtomicReference errorMsgObj) throws InterruptedException { + boolean isUploadedToLambda = Boolean.FALSE; + int retryCount = maxRetries; + SdkBytes payload; + do { + + try { + currentBuffer.flushToLambda(); + isUploadedToLambda = Boolean.TRUE; + } catch (AwsServiceException | SdkClientException e) { + errorMsgObj.set(e.getMessage()); + LOG.error("Exception occurred while uploading records to lambda. Retry countdown : {} | exception:", + retryCount, e); + --retryCount; + if (retryCount == 0) { + return isUploadedToLambda; + } + Thread.sleep(5000); + } + } while (!isUploadedToLambda); + return isUploadedToLambda; + } +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java new file mode 100644 index 0000000000..74aa98e7f9 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheck.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; + +import java.time.Duration; + +/** + * Check threshold limits. + */ +public class ThresholdCheck { + + private ThresholdCheck() { + } + + public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final Duration maxCollectionDuration, final Boolean isBatchEnabled) { + if (!isBatchEnabled) return true; + + if (maxEvents > 0) { + return currentBuffer.getEventCount() + 1 > maxEvents || + currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0 || + currentBuffer.getSize() > maxBytes.getBytes(); + } else { + return currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0 || + currentBuffer.getSize() > maxBytes.getBytes(); + } + } +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java new file mode 100644 index 0000000000..9140d258fe --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/Buffer.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; + +import software.amazon.awssdk.core.SdkBytes; + +import java.io.OutputStream; +import java.time.Duration; + +/** + * A buffer can hold data before flushing it to S3. + */ +public interface Buffer { + + long getSize(); + + int getEventCount(); + + Duration getDuration(); + + void flushToLambda(); + + OutputStream getOutputStream(); + + SdkBytes getPayload(); + + void setEventCount(int eventCount); +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java new file mode 100644 index 0000000000..80afd2f1ca --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/BufferFactory.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; + +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.io.IOException; + +public interface BufferFactory { + Buffer getBuffer(LambdaClient lambdaClient, String functionName, String invocationType) throws IOException; +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java new file mode 100644 index 0000000000..9785be778b --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBuffer.java @@ -0,0 +1,112 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.time.StopWatch; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * A buffer can hold in memory data and flushing it to S3. + */ +public class InMemoryBuffer implements Buffer { + + private static final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + private final LambdaClient lambdaClient; + private final String functionName; + private final String invocationType; + private int eventCount; + private final StopWatch watch; + private boolean isCodecStarted; + + + public InMemoryBuffer(LambdaClient lambdaClient, String functionName, String invocationType) { + this.lambdaClient = lambdaClient; + this.functionName = functionName; + this.invocationType = invocationType; + + byteArrayOutputStream.reset(); + eventCount = 0; + watch = new StopWatch(); + watch.start(); + isCodecStarted = false; + } + + @Override + public long getSize() { + return byteArrayOutputStream.size(); + } + + @Override + public int getEventCount() { + return eventCount; + } + + public Duration getDuration() { + return Duration.ofMillis(watch.getTime(TimeUnit.MILLISECONDS)); + } + + /** + * Upload accumulated data to s3 bucket. + */ + @Override + public void flushToLambda() { + InvokeResponse resp; + SdkBytes payload = getPayload(); + + // Setup an InvokeRequest. + InvokeRequest request = InvokeRequest.builder() + .functionName(functionName) + .payload(payload) + .invocationType(invocationType) + .build(); + + resp = lambdaClient.invoke(request); + } + + private SdkBytes validatePayload(String payload_string) { + ObjectMapper mapper = new ObjectMapper(); + try { + JsonNode jsonNode = mapper.readTree(byteArrayOutputStream.toByteArray()); + + // Convert the JsonNode back to a String to normalize it (removes extra spaces, trailing commas, etc.) + String normalizedJson = mapper.writeValueAsString(jsonNode); + return SdkBytes.fromUtf8String(normalizedJson); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + @Override + public void setEventCount(int eventCount) { + this.eventCount = eventCount; + } + + @Override + public OutputStream getOutputStream() { + return byteArrayOutputStream; + } + + @Override + public SdkBytes getPayload() { + byte[] bytes = byteArrayOutputStream.toByteArray(); + SdkBytes sdkBytes = SdkBytes.fromByteArray(bytes); + return sdkBytes; + } +} + diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java new file mode 100644 index 0000000000..e58952c5cb --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/accumlator/InMemoryBufferFactory.java @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumlator; + +import software.amazon.awssdk.services.lambda.LambdaClient; + + +public class InMemoryBufferFactory implements BufferFactory { + @Override + public Buffer getBuffer(LambdaClient lambdaClient, String functionName, String invocationType){ + return new InMemoryBuffer(lambdaClient, functionName, invocationType); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/JsonCodec.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/JsonCodec.java new file mode 100644 index 0000000000..471ea351c6 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/JsonCodec.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda.codec; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Objects; + +public class JsonCodec implements OutputCodec { + private final ObjectMapper objectMapper = new ObjectMapper(); + private static final String JSON = "json"; + private static final JsonFactory factory = new JsonFactory(); + private JsonGenerator generator; + private OutputCodecContext codecContext; + private final String keyName; + + public JsonCodec(final String keyName) { + this.keyName = keyName; + } + + @Override + public String getExtension() { + return JSON; + } + + @Override + public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(codecContext); + this.codecContext = codecContext; + generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); + if(Objects.nonNull(keyName)){ + generator.writeStartObject(); + generator.writeFieldName(keyName); + generator.writeStartArray(); + } + } + + @Override + public void complete(final OutputStream outputStream) throws IOException { + if(!Objects.isNull(keyName)) { + generator.writeEndArray(); + generator.writeEndObject(); + } + + generator.close(); + outputStream.flush(); + outputStream.close(); + } + + @Override + public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException { + Objects.requireNonNull(event); + if(Objects.isNull(keyName)) { + Map eventMap = event.toMap(); + objectMapper.writeValue(outputStream, eventMap); + + }else{ + Map dataMap = event.toMap(); //(event); + objectMapper.writeValue(generator, dataMap); + } + generator.flush(); + } +} + + + + diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java new file mode 100644 index 0000000000..8d6c64829d --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/AwsAuthenticationOptions.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.regions.Region; + +import java.util.Map; + +public class AwsAuthenticationOptions { + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_external_id") + @Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters") + private String awsStsExternalId; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + public String getAwsStsRoleArn() { + return awsStsRoleArn; + } + + public String getAwsStsExternalId() { + return awsStsExternalId; + } + + public Map getAwsStsHeaderOverrides() { + return awsStsHeaderOverrides; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java new file mode 100644 index 0000000000..3773d4e6ed --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/BatchOptions.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + + +public class BatchOptions { + + private static final String DEFAULT_BATCH_KEY = "events"; + + @JsonProperty("batch_key") + private String batchKey = DEFAULT_BATCH_KEY; + + @JsonProperty("threshold") + @NotNull + ThresholdOptions thresholdOptions; + + public String getBatchKey(){return batchKey;} + + public ThresholdOptions getThresholdOptions(){return thresholdOptions;} + +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java new file mode 100644 index 0000000000..1dcaf6de64 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptions.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.hibernate.validator.constraints.time.DurationMax; +import org.hibernate.validator.constraints.time.DurationMin; +import org.opensearch.dataprepper.model.types.ByteCount; +import java.time.Duration; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; + +/** + * An implementation class of s3 index configuration Options + */ +public class ThresholdOptions { + + private static final String DEFAULT_BYTE_CAPACITY = "6mb"; + + @JsonProperty("event_count") + @Size(min = 0, max = 10000000, message = "event_count size should be between 0 and 10000000") + @NotNull + private int eventCount; + + @JsonProperty("maximum_size") + private String maximumSize = DEFAULT_BYTE_CAPACITY; + + @JsonProperty("event_collect_timeout") + @DurationMin(seconds = 1) + @DurationMax(seconds = 3600) + @NotNull + private Duration eventCollectTimeOut; + + /** + * Read event collection duration configuration. + * @return event collect time out. + */ + public Duration getEventCollectTimeOut() { + return eventCollectTimeOut; + } + + /** + * Read byte capacity configuration. + * @return maximum byte count. + */ + public ByteCount getMaximumSize() { + return ByteCount.parse(maximumSize); + } + + /** + * Read the event count configuration. + * @return event count. + */ + public int getEventCount() { + return eventCount; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java new file mode 100644 index 0000000000..1bdeb0a394 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandler.java @@ -0,0 +1,131 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda.dlq; + +import com.fasterxml.jackson.databind.ObjectWriter; +import io.micrometer.core.instrument.util.StringUtils; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.failures.DlqObject; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.dlq.DlqProvider; +import org.opensearch.dataprepper.plugins.dlq.DlqWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.StringJoiner; + +import static java.util.UUID.randomUUID; + + +/** + * * An Handler class which helps log failed data to AWS S3 bucket or file based on configuration. + */ + +public class DlqPushHandler { + + private static final Logger LOG = LoggerFactory.getLogger(DlqPushHandler.class); + + private static final String BUCKET = "bucket"; + + private static final String ROLE_ARN = "sts_role_arn"; + + private static final String REGION = "region"; + + private static final String S3_PLUGIN_NAME = "s3"; + + private static final String KEY_PATH_PREFIX = "key_path_prefix"; + + private String dlqFile; + + private String keyPathPrefix; + + private DlqProvider dlqProvider; + + private ObjectWriter objectWriter; + + public DlqPushHandler( + final PluginFactory pluginFactory, + final String bucket, + final String stsRoleArn, + final String awsRegion, + final String dlqPathPrefix) { + + this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,dlqPathPrefix); + } + + public void perform(final PluginSetting pluginSetting, + final Object failedData) { + if(dlqFile != null) + writeToFile(failedData); + else + pushToS3(pluginSetting, failedData); + } + + private void writeToFile(Object failedData) { + try(BufferedWriter dlqFileWriter = Files.newBufferedWriter(Paths.get(dlqFile), + StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { + dlqFileWriter.write(objectWriter.writeValueAsString(failedData)+"\n"); + } catch (IOException e) { + LOG.error("Exception while writing failed data to DLQ file Exception: ",e); + } + } + + private void pushToS3(PluginSetting pluginSetting, Object failedData) { + DlqWriter dlqWriter = getDlqWriter(pluginSetting.getPipelineName()); + try { + String pluginId = randomUUID().toString(); + DlqObject dlqObject = DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginSetting.getName()) + .withPipelineName(pluginSetting.getPipelineName()) + .withFailedData(failedData) + .build(); + final List dlqObjects = Arrays.asList(dlqObject); + dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginId); + LOG.info("wrote {} events to DLQ",dlqObjects.size()); + } catch (final IOException e) { + LOG.error("Exception while writing failed data to DLQ, Exception : ", e); + } + } + + private DlqWriter getDlqWriter(final String writerPipelineName) { + Optional potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER) + .add(writerPipelineName).toString()); + DlqWriter dlqWriter = potentialDlq.isPresent() ? potentialDlq.get() : null; + return dlqWriter; + } + + private DlqProvider getDlqProvider(final PluginFactory pluginFactory, + final String bucket, + final String stsRoleArn, + final String awsRegion, + final String dlqPathPrefix) { + final Map props = new HashMap<>(); + props.put(BUCKET, bucket); + props.put(ROLE_ARN, stsRoleArn); + props.put(REGION, awsRegion); + this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix); + props.put(KEY_PATH_PREFIX, dlqPathPrefix); + final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + DlqProvider dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting); + return dlqProvider; + } + + private String enforceDefaultDelimiterOnKeyPathPrefix(final String keyPathPrefix) { + return (keyPathPrefix.charAt(keyPathPrefix.length() - 1) == '/') ? keyPathPrefix : keyPathPrefix.concat("/"); + } +} + diff --git a/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java new file mode 100644 index 0000000000..0808010e37 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/LambdaSinkFailedDlqData.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda.dlq; + +import com.fasterxml.jackson.core.JsonProcessingException; +import software.amazon.awssdk.core.SdkBytes; + + +public class LambdaSinkFailedDlqData { + + private SdkBytes payload; + + private String message; + + private int status; + + public LambdaSinkFailedDlqData(SdkBytes payload, String message, int status) throws JsonProcessingException { + this.payload = payload; + this.message = message; + this.status = status; + } + + public SdkBytes getPayload() { + return payload; + } + + public LambdaSinkFailedDlqData setPayload(SdkBytes payload) { + this.payload = payload; + return this; + } + + public String getMessage() { + return message; + } + + public LambdaSinkFailedDlqData setMessage(String message) { + this.message = message; + return this; + } + + public int getStatus() { + return status; + } + + public LambdaSinkFailedDlqData setStatus(int status) { + this.status = status; + return this; + } + + @Override + public String toString() { + + return "failedData\n" + + "payload \"" + payload.asUtf8String() + "\"\n" + + "message \"" + message + "\"\n" + + "status \"" + status + "\n"; + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java new file mode 100644 index 0000000000..ab72ee44b8 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaClientFactoryTest.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.LambdaClientBuilder; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.UUID; + +@ExtendWith(MockitoExtension.class) +class LambdaClientFactoryTest { + @Mock + private LambdaSinkConfig lambdaSinkConfig; + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + + @Mock + private AwsAuthenticationOptions awsAuthenticationOptions; + + @BeforeEach + void setUp() { + when(lambdaSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + } + + @Test + void createLambdaClient_with_real_LambdaClient() { + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + final LambdaClient lambdaClient = LambdaClientFactory.createLambdaClient(lambdaSinkConfig, awsCredentialsSupplier); + + assertThat(lambdaClient, notNullValue()); + } + + @ParameterizedTest + @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) + void createlambdaClient_provides_correct_inputs(final String regionString) { + final Region region = Region.of(regionString); + final String stsRoleArn = UUID.randomUUID().toString(); + final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn); + when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); + + final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider); + + final LambdaClientBuilder lambdaClientBuilder = mock(LambdaClientBuilder.class); + when(lambdaClientBuilder.region(region)).thenReturn(lambdaClientBuilder); + when(lambdaClientBuilder.credentialsProvider(any())).thenReturn(lambdaClientBuilder); + when(lambdaClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(lambdaClientBuilder); + try(final MockedStatic lambdaClientMockedStatic = mockStatic(LambdaClient.class)) { + lambdaClientMockedStatic.when(LambdaClient::builder) + .thenReturn(lambdaClientBuilder); + LambdaClientFactory.createLambdaClient(lambdaSinkConfig, awsCredentialsSupplier); + } + + final ArgumentCaptor credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class); + verify(lambdaClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture()); + + final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue(); + + assertThat(actualCredentialsProvider, equalTo(expectedCredentialsProvider)); + + final ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); + verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture()); + + final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue(); + assertThat(actualCredentialsOptions.getRegion(), equalTo(region)); + assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); + assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java new file mode 100644 index 0000000000..b6ceba83dc --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkConfigTest.java @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.regions.Region; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + + +class LambdaSinkConfigTest { + public static final int DEFAULT_MAX_RETRIES = 3; + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + Boolean DEFAULT_INVOCATION = false; + + @Test + void lambda_sink_default_max_connection_retries_test(){ + assertThat(new LambdaSinkConfig().getMaxConnectionRetries(),equalTo(DEFAULT_MAX_RETRIES)); + } + + @Test + void lambda_sink_default_invocation_type_test(){ + assertThat(new LambdaSinkConfig().getSync(),equalTo(DEFAULT_INVOCATION)); + } + + + @Test + void lambda_sink_pipeline_config_test() throws JsonProcessingException { + final String config = + " function_name: test_function\n" + + " aws:\n" + + " region: ap-south-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " max_retries: 10\n" + + " dlq:\n" + + " s3:\n" + + " bucket: test\n" + + " key_path_prefix: test\n" + + " region: ap-south-1\n" + + " sts_role_arn: test-role-arn\n"; + final LambdaSinkConfig lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + assertThat(lambdaSinkConfig.getMaxConnectionRetries(),equalTo(10)); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion(),equalTo(Region.AP_SOUTH_1)); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(),equalTo("arn:aws:iam::524239988912:role/app-test")); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsHeaderOverrides().get("test"),equalTo("test")); + assertThat(lambdaSinkConfig.getDlqStsRegion(),equalTo("ap-south-1")); + assertThat(lambdaSinkConfig.getDlqStsRoleARN(),equalTo("test-role-arn")); + } + + @Test + void lambda_sink_pipeline_config_test_with_no_dlq() throws JsonProcessingException { + final String config = + " function_name: test_function\n" + + " aws:\n" + + " region: ap-south-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " max_retries: 10\n"; + final LambdaSinkConfig lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + assertThat(lambdaSinkConfig.getMaxConnectionRetries(),equalTo(10)); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion(),equalTo(Region.AP_SOUTH_1)); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(),equalTo("arn:aws:iam::524239988912:role/app-test")); + assertThat(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsStsHeaderOverrides().get("test"),equalTo("test")); + assertThat(lambdaSinkConfig.getDlqStsRegion(),equalTo("ap-south-1")); + assertThat(lambdaSinkConfig.getDlqStsRoleARN(),equalTo("arn:aws:iam::524239988912:role/app-test")); + assertThat(lambdaSinkConfig.getDlqPluginSetting().get("key"),equalTo(null)); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java new file mode 100644 index 0000000000..d3254b6179 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkServiceTest.java @@ -0,0 +1,358 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBuffer; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.config.ThresholdOptions; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.sink.lambda.dlq.LambdaSinkFailedDlqData; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; + + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +public class LambdaSinkServiceTest { + + public static final int maxEvents = 10; + public static final int maxRetries = 3; + public static final String region = "us-east-1"; + public static final String maxSize = "1kb"; + public static final String functionName = "testFunction"; + public static final String invocationType = "event"; + public static final String batchKey ="lambda_batch_key"; + public static final String config = + " function_name: testFunction\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " sync: false\n" + + " max_retries: 10\n"; + + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + private LambdaSinkConfig lambdaSinkConfig; + private LambdaClient lambdaClient; + private PluginMetrics pluginMetrics; + private Counter numberOfRecordsSuccessCounter; + private Counter numberOfRecordsFailedCounter; + private DlqPushHandler dlqPushHandler; + private Buffer buffer; + private BufferFactory bufferFactory; + + + private InvokeResponse invokeResponse; + + private SdkHttpResponse sdkHttpResponse; + + InvokeResponse mockResponse; + + @BeforeEach + public void setup() throws IOException { + this.lambdaClient = mock(LambdaClient.class); + this.pluginMetrics = mock(PluginMetrics.class); + this.buffer = mock(InMemoryBuffer.class); + this.lambdaSinkConfig = mock(LambdaSinkConfig.class); + this.numberOfRecordsSuccessCounter = mock(Counter.class); + this.numberOfRecordsFailedCounter = mock(Counter.class); + this.dlqPushHandler = mock(DlqPushHandler.class); + this.bufferFactory = mock(BufferFactory.class); + when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)).thenReturn(numberOfRecordsSuccessCounter); + when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).thenReturn(numberOfRecordsFailedCounter); + mockResponse = InvokeResponse.builder() + .statusCode(200) // HTTP 200 for successful invocation + .payload(SdkBytes.fromString("{\"key\": \"value\"}", java.nio.charset.StandardCharsets.UTF_8)) + .build(); + } + + private LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig) throws IOException { + bufferFactory = new InMemoryBufferFactory(); + buffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + return new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + mock(PluginFactory.class), + mock(PluginSetting.class), + mock(OutputCodecContext.class), + mock(AwsCredentialsSupplier.class), + dlqPushHandler, + bufferFactory); + } + + private LambdaSinkService createObjectUnderTest(String config) throws IOException { + this.lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + bufferFactory = new InMemoryBufferFactory(); + buffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); + return new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + mock(PluginFactory.class), + mock(PluginSetting.class), + mock(OutputCodecContext.class), + mock(AwsCredentialsSupplier.class), + dlqPushHandler, + bufferFactory); + } + + @Test + public void lambda_sink_test_with_empty_payload_records() throws IOException { + numberOfRecordsSuccessCounter = mock(Counter.class); + LambdaSinkService lambdaSinkService = createObjectUnderTest(config); + lambdaSinkService.output(List.of()); + verifyNoInteractions(lambdaClient); + verifyNoInteractions(numberOfRecordsSuccessCounter); + verifyNoInteractions(numberOfRecordsFailedCounter); + } + + + @Test + public void lambda_sink_test_with_single_record_success_push_to_lambda() throws IOException { + LambdaSinkService lambdaSinkService = createObjectUnderTest(config); + + Map map = new HashMap<>(); + map.put("query1","test1"); + map.put("query2","test2"); + + final Record eventRecord = new Record<>(JacksonEvent.builder().withData(map).withEventType("event").build()); + Collection> records = List.of(eventRecord); + final ArgumentCaptor invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class); + + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + lambdaSinkService.output(records); + + verify(lambdaClient).invoke(invokeRequestCaptor.capture()); + final InvokeRequest actualRequest = invokeRequestCaptor.getValue(); + assertEquals(actualRequest.functionName(), "testFunction"); + assertEquals(actualRequest.invocationType().toString(), "Event"); + verify(numberOfRecordsSuccessCounter).increment(records.size()); + } + + @Test + public void lambda_sink_test_with_request_response_invocation_success_push_to_lambda() throws IOException { + final String config = + " function_name: test_function\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " sync: true\n" + + " max_retries: 10\n"; + + + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + + LambdaSinkService lambdaSinkService = createObjectUnderTest(config); + final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + Collection> records = List.of(eventRecord); + final ArgumentCaptor invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class); + + lambdaSinkService.output(records); + + verify(lambdaClient).invoke(invokeRequestCaptor.capture()); + final InvokeRequest actualRequest = invokeRequestCaptor.getValue(); + assertEquals(actualRequest.functionName(), "test_function"); + assertEquals(actualRequest.invocationType().toString(), "RequestResponse"); + verify(numberOfRecordsSuccessCounter).increment(records.size()); + } + + @Test + public void lambda_sink_test_max_retires_works() throws IOException { + final String config = + " function_name: test_function\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " sync: true\n" + + " max_retries: 3\n"; + this.buffer = mock(InMemoryBuffer.class); + when(lambdaClient.invoke(any(InvokeRequest.class))).thenThrow(AwsServiceException.class); + doNothing().when(dlqPushHandler).perform(any(PluginSetting.class), any(LambdaSinkFailedDlqData.class)); + + this.lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + bufferFactory = mock(BufferFactory.class); + buffer = mock(Buffer.class); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + when(buffer.getOutputStream()).thenReturn(byteArrayOutputStream); + when(bufferFactory.getBuffer(any(LambdaClient.class),any(),any())).thenReturn(buffer); + doThrow(AwsServiceException.class).when(buffer).flushToLambda(); + + LambdaSinkService lambdaSinkService = new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + mock(PluginFactory.class), + mock(PluginSetting.class), + mock(OutputCodecContext.class), + mock(AwsCredentialsSupplier.class), + dlqPushHandler, + bufferFactory); + + final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + Collection> records = List.of(eventRecord); + lambdaSinkService.output(records); + + verify(buffer, times(3)).flushToLambda(); + } + + @Test + public void lambda_sink_test_dlq_works() throws IOException { + final String config = + " function_name: test_function\n" + + " aws:\n" + + " region: us-east-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " sync: true\n" + + " max_retries: 3\n"; + + when(lambdaClient.invoke(any(InvokeRequest.class))).thenThrow(AwsServiceException.class); + doNothing().when(dlqPushHandler).perform(any(PluginSetting.class), any(LambdaSinkFailedDlqData.class)); + + this.lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class); + bufferFactory = mock(BufferFactory.class); + buffer = mock(Buffer.class); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + when(buffer.getOutputStream()).thenReturn(byteArrayOutputStream); + when(bufferFactory.getBuffer(any(LambdaClient.class),any(),any())).thenReturn(buffer); + + doThrow(AwsServiceException.class).when(buffer).flushToLambda(); + + LambdaSinkService lambdaSinkService = new LambdaSinkService(lambdaClient, + lambdaSinkConfig, + pluginMetrics, + mock(PluginFactory.class), + mock(PluginSetting.class), + mock(OutputCodecContext.class), + mock(AwsCredentialsSupplier.class), + dlqPushHandler, + bufferFactory); + + final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + Collection> records = List.of(eventRecord); + + lambdaSinkService.output(records); + + verify(buffer, times(3)).flushToLambda(); + verify(dlqPushHandler,times(1)).perform(any(PluginSetting.class),any(Object.class)); + } + + @Test + public void lambda_sink_test_with_multiple_record_success_push_to_lambda() throws IOException { + LambdaSinkService lambdaSinkService = createObjectUnderTest(config); + final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + Collection> records = new ArrayList<>(); + int totalRecords = 11; + for(int recordSize = 0; recordSize < totalRecords ; recordSize++) { + records.add(eventRecord); + } + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + + lambdaSinkService.output(records); + + verify(lambdaClient,times(totalRecords)).invoke(any(InvokeRequest.class)); + + } + + @Test + void lambda_sink_service_test_output_with_single_record_ack_release() throws IOException { + final LambdaSinkService lambdaSinkService = createObjectUnderTest(config); + final Event event = mock(Event.class); + given(event.toJsonString()).willReturn("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}"); + given(event.getEventHandle()).willReturn(mock(EventHandle.class)); + + final ArgumentCaptor invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class); + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + + lambdaSinkService.output(List.of(new Record<>(event))); + + verify(lambdaClient,times(1)).invoke(invokeRequestCaptor.capture()); + final InvokeRequest actualRequest = invokeRequestCaptor.getValue(); + assertThat(actualRequest.functionName(), equalTo("testFunction")); + assertThat(actualRequest.invocationType().toString(), equalTo("Event")); + verify(numberOfRecordsSuccessCounter).increment(1); + } + + @Test + public void lambda_sink_test_batch_enabled() throws IOException { + when(lambdaSinkConfig.getSync()).thenReturn(false); + when(lambdaSinkConfig.getFunctionName()).thenReturn(functionName); + when(lambdaSinkConfig.getMaxConnectionRetries()).thenReturn(maxRetries); + when(lambdaSinkConfig.getBatchOptions()).thenReturn(mock(BatchOptions.class)); + when(lambdaSinkConfig.getBatchOptions().getBatchKey()).thenReturn(batchKey); + when(lambdaSinkConfig.getBatchOptions().getThresholdOptions()).thenReturn(mock(ThresholdOptions.class)); + when(lambdaSinkConfig.getBatchOptions().getThresholdOptions().getEventCount()).thenReturn(maxEvents); + when(lambdaSinkConfig.getBatchOptions().getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse(maxSize)); + when(lambdaSinkConfig.getBatchOptions().getThresholdOptions().getEventCollectTimeOut()).thenReturn(Duration.ofNanos(10L)); + when(lambdaSinkConfig.getAwsAuthenticationOptions()).thenReturn(mock(AwsAuthenticationOptions.class)); + + LambdaSinkService lambdaSinkService = createObjectUnderTest(lambdaSinkConfig); + + Map map = new HashMap<>(); + map.put("query1","test1"); + map.put("query2","test2"); + + String expected_payload = "{\"lambda_batch_key\":[{\"query1\":\"test1\",\"query2\":\"test2\"}]}"; + final Record eventRecord = new Record<>(JacksonEvent.builder().withData(map).withEventType("event").build()); + Collection> records = List.of(eventRecord); + final ArgumentCaptor invokeRequestCaptor = ArgumentCaptor.forClass(InvokeRequest.class); + + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + lambdaSinkService.output(records); + + verify(lambdaClient).invoke(invokeRequestCaptor.capture()); + final InvokeRequest actualRequest = invokeRequestCaptor.getValue(); + assertEquals(actualRequest.functionName(), functionName); + assertEquals(actualRequest.invocationType().toString(), "Event"); + String actualRequestPayload = actualRequest.payload().asUtf8String(); + assertEquals(actualRequestPayload, expected_payload ); + verify(numberOfRecordsSuccessCounter).increment(records.size()); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java new file mode 100644 index 0000000000..1b33dff590 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/LambdaSinkTest.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaClient; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class LambdaSinkTest { + + public static final String S3_REGION = "us-east-1"; + public static final String CODEC_PLUGIN_NAME = "json"; + public static final String SINK_PLUGIN_NAME = "lambda"; + public static final String SINK_PIPELINE_NAME = "lambda-sink-pipeline"; + private LambdaSinkConfig lambdaSinkConfig; + private LambdaSink lambdaSink; + LambdaClient lambdaClient; + private PluginSetting pluginSetting; + private PluginFactory pluginFactory; + private AwsCredentialsSupplier awsCredentialsSupplier; + private SinkContext sinkContext; + + @BeforeEach + void setUp() { + lambdaSinkConfig = mock(LambdaSinkConfig.class); + sinkContext = mock(SinkContext.class); + AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + pluginSetting = mock(PluginSetting.class); + PluginModel pluginModel = mock(PluginModel.class); + pluginFactory = mock(PluginFactory.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); + Map dlqMap = mock(HashMap.class); + LambdaClient lambdaClient = mock(LambdaClient.class); + + + when(lambdaSinkConfig.getDlq()).thenReturn(pluginModel); + when(pluginModel.getPluginSettings()).thenReturn(dlqMap); + when(lambdaSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION)); + when(pluginModel.getPluginName()).thenReturn(CODEC_PLUGIN_NAME); + when(pluginSetting.getName()).thenReturn(SINK_PLUGIN_NAME); + when(pluginSetting.getPipelineName()).thenReturn(SINK_PIPELINE_NAME); + } + + private LambdaSink createObjectUnderTest() { + return new LambdaSink(pluginSetting, lambdaSinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier); + } + + @Test + void test_lambda_sink_plugin_isReady_positive() { + lambdaSink = createObjectUnderTest(); + Assertions.assertNotNull(lambdaSink); + Assertions.assertNotNull(lambdaSinkConfig); + lambdaSink.doInitialize(); + assertTrue(lambdaSink.isReady(), "lambda sink is not initialized and not ready to work"); + } + + @Test + void test_lambda_sink_plugin_isReady_negative() { + lambdaSink = createObjectUnderTest(); + Assertions.assertNotNull(lambdaSink); + assertFalse(lambdaSink.isReady(), "lambda sink is initialized and ready to work"); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java new file mode 100644 index 0000000000..b63553911a --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/ThresholdCheckTest.java @@ -0,0 +1,126 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; + +import java.io.IOException; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ThresholdCheckTest { + + @Mock(lenient = true) + private Buffer buffer; + private int maxEvents; + private ByteCount maxBytes; + private Duration maxCollectionDuration; + private Boolean isBatchEnabled; + + @BeforeEach + void setUp() { + maxEvents = 10_000; + maxBytes = ByteCount.parse("48mb"); + maxCollectionDuration = Duration.ofMinutes(5); + isBatchEnabled = true; + } + + @Test + void test_exceedThreshold_true_dueTo_maxEvents_is_greater_than_buffered_event_count() throws IOException { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents + 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertTrue(isThresholdExceed, "Threshold exceeded"); + } + + @Test + void test_exceedThreshold_false_dueTo_maxEvents_is_less_than_buffered_event_count() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(this.maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertFalse(isThresholdExceed, "Threshold exceeded"); + } + + @Test + void test_exceedThreshold_true_dueTo_maxBytes_is_greater_than_buffered_byte_count() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() + 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertTrue(isThresholdExceed, "Threshold not exceeded"); + } + + @Test + void test_exceedThreshold_false_dueTo_maxBytes_is_less_than_buffered_byte_count() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertFalse(isThresholdExceed, "Threshold exceeded"); + } + + @Test + void test_exceedThreshold_true_dueTo_maxCollectionDuration_is_greater_than_buffered_event_collection_duration() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.plusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertTrue(isThresholdExceed, "Threshold not exceeded"); + } + + @Test + void test_exceedThreshold_false_dueTo_maxCollectionDuration_is_less_than_buffered_event_collection_duration() { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents - 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + + assertFalse(isThresholdExceed, "Threshold exceeded"); + } + + @Test + void test_exceedThreshold_when_batch_is_enabled() throws IOException { + when(buffer.getSize()).thenReturn(maxBytes.getBytes() - 1000); + when(buffer.getEventCount()).thenReturn(maxEvents + 1); + when(buffer.getDuration()).thenReturn(maxCollectionDuration.minusSeconds(1)); + Boolean isBatchEnabled = false; + + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(buffer, maxEvents, + maxBytes, maxCollectionDuration, isBatchEnabled); + + assertTrue(isThresholdExceed); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java new file mode 100644 index 0000000000..d161b28bb0 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferFactoryTest.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumulator; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +class InMemoryBufferFactoryTest { + + @Test + void test_inMemoryBufferFactory_notNull(){ + InMemoryBufferFactory inMemoryBufferFactory = new InMemoryBufferFactory(); + Assertions.assertNotNull(inMemoryBufferFactory); + } + + @Test + void test_buffer_notNull(){ + InMemoryBufferFactory inMemoryBufferFactory = new InMemoryBufferFactory(); + Assertions.assertNotNull(inMemoryBufferFactory); + Buffer buffer = inMemoryBufferFactory.getBuffer(null, null, null); + Assertions.assertNotNull(buffer); + assertThat(buffer, instanceOf(Buffer.class)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java new file mode 100644 index 0000000000..478650a300 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/accumulator/InMemoryBufferTest.java @@ -0,0 +1,167 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.accumulator; + +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBuffer; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; + +import java.io.IOException; +import java.io.OutputStream; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class InMemoryBufferTest { + + public static final int MAX_EVENTS = 55; + @Mock + private LambdaClient lambdaClient; + + private final String functionName = "testFunction"; + + private final String invocationType = "Event"; + + private InMemoryBuffer inMemoryBuffer; + + @Test + void test_with_write_event_into_buffer() throws IOException { + inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); + + while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { + OutputStream outputStream = inMemoryBuffer.getOutputStream(); + outputStream.write(generateByteArray()); + int eventCount = inMemoryBuffer.getEventCount() +1; + inMemoryBuffer.setEventCount(eventCount); + } + assertThat(inMemoryBuffer.getSize(), greaterThanOrEqualTo(54110L)); + assertThat(inMemoryBuffer.getEventCount(), equalTo(MAX_EVENTS)); + assertThat(inMemoryBuffer.getDuration(), notNullValue()); + assertThat(inMemoryBuffer.getDuration(), greaterThanOrEqualTo(Duration.ZERO)); + + } + + @Test + @Disabled("unstable") + /** + * There are 5 checkpoints in the tests as below + * |-----------upperBoundDuration-------------| + * startTime --- stopWatchStart --- endTime --- checkpoint --- stopwatchGetDuration + * |-lowerBoundDuration-| + * |------------inMemoryBuffer.Duration-------------| + * This test assumes the startTime and stopWatchStart are same, checkpoint and stopwatchGetDuration are same. + * However, they are not true at some systems. + */ + void getDuration_provides_duration_within_expected_range() throws IOException, InterruptedException { + Instant startTime = Instant.now(); + inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); + Instant endTime = Instant.now(); + + + while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { + OutputStream outputStream = inMemoryBuffer.getOutputStream(); + outputStream.write(generateByteArray()); + int eventCount = inMemoryBuffer.getEventCount() +1; + inMemoryBuffer.setEventCount(eventCount); + } + Thread.sleep(100); + + Instant durationCheckpointTime = Instant.now(); + Duration duration = inMemoryBuffer.getDuration(); + assertThat(duration, notNullValue()); + + Duration upperBoundDuration = Duration.between(startTime, durationCheckpointTime).truncatedTo(ChronoUnit.MILLIS); + Duration lowerBoundDuration = Duration.between(endTime, durationCheckpointTime).truncatedTo(ChronoUnit.MILLIS); + assertThat(duration, greaterThanOrEqualTo(lowerBoundDuration)); + assertThat(duration, lessThanOrEqualTo(upperBoundDuration)); + } + + @Test + void test_with_write_event_into_buffer_and_flush_toLambda() throws IOException { + + // Mock the response of the invoke method + InvokeResponse mockResponse = InvokeResponse.builder() + .statusCode(200) // HTTP 200 for successful invocation + .payload(SdkBytes.fromString("{\"key\": \"value\"}", java.nio.charset.StandardCharsets.UTF_8)) + .build(); + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + + inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); + while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { + OutputStream outputStream = inMemoryBuffer.getOutputStream(); + outputStream.write(generateByteArray()); + int eventCount = inMemoryBuffer.getEventCount() +1; + inMemoryBuffer.setEventCount(eventCount); + } + assertDoesNotThrow(() -> { + inMemoryBuffer.flushToLambda(); + }); + } + + @Test + void test_uploadedToLambda_success() throws IOException { + // Mock the response of the invoke method + InvokeResponse mockResponse = InvokeResponse.builder() + .statusCode(200) // HTTP 200 for successful invocation + .payload(SdkBytes.fromString("{\"key\": \"value\"}", java.nio.charset.StandardCharsets.UTF_8)) + .build(); + when(lambdaClient.invoke(any(InvokeRequest.class))).thenReturn(mockResponse); + inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); + Assertions.assertNotNull(inMemoryBuffer); + OutputStream outputStream = inMemoryBuffer.getOutputStream(); + outputStream.write(generateByteArray()); + assertDoesNotThrow(() -> { + inMemoryBuffer.flushToLambda(); + }); + } + + @Test + void test_uploadedToLambda_fails() { + // Mock the response of the invoke method + InvokeResponse mockResponse = InvokeResponse.builder() + .statusCode(200) // HTTP 200 for successful invocation + .payload(SdkBytes.fromString("{\"key\": \"value\"}", java.nio.charset.StandardCharsets.UTF_8)) + .build(); + SdkClientException sdkClientException = mock(SdkClientException.class); + when(lambdaClient.invoke(any(InvokeRequest.class))) + .thenThrow(sdkClientException); + inMemoryBuffer = new InMemoryBuffer(lambdaClient, functionName, invocationType); + + Assertions.assertNotNull(inMemoryBuffer); + SdkClientException actualException = assertThrows(SdkClientException.class, () -> inMemoryBuffer.flushToLambda()); + assertThat(actualException, Matchers.equalTo(sdkClientException)); + } + + private byte[] generateByteArray() { + byte[] bytes = new byte[1000]; + for (int i = 0; i < 1000; i++) { + bytes[i] = (byte) i; + } + return bytes; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/JsonCodecTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/JsonCodecTest.java new file mode 100644 index 0000000000..1e28dba93e --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/codec/JsonCodecTest.java @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.codec; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeType; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class JsonCodecTest { + private ByteArrayOutputStream outputStream; + + private JsonCodec createObjectUnderTest() { + String key = "event"; + return new JsonCodec(key); + } + + @Test + void test_happy_case_with_null_codec_key() throws IOException { + JsonCodec jsonCodec = new JsonCodec(null); + + outputStream = new ByteArrayOutputStream(); + OutputCodecContext codecContext = new OutputCodecContext(); +// jsonCodec.start(outputStream, null, codecContext); + jsonCodec.start(outputStream, null, codecContext); + + final List> expectedData = generateRecords(1); + final Event event = convertToEvent(expectedData.get(0)); + jsonCodec.writeEvent(event, outputStream); + jsonCodec.complete(outputStream); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree(outputStream.toByteArray()); + assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + assertEquals(jsonNode.toString(),"{\"name\":\"Person0\",\"age\":0}"); + } + + + @Test + void test_happy_case_with_codec_key() throws IOException { + String key = "events"; + final int numberOfRecords = 2; + JsonCodec jsonCodec = new JsonCodec(key); + + outputStream = new ByteArrayOutputStream(); + OutputCodecContext codecContext = new OutputCodecContext(); + jsonCodec.start(outputStream, null, codecContext); + + final List> expectedData = generateRecords(numberOfRecords); + for (int index = 0; index < numberOfRecords; index++) { + final Event event = convertToEvent(expectedData.get(index)); + jsonCodec.writeEvent(event, outputStream); + } + jsonCodec.complete(outputStream); + + String expectedString = "{\"events\":[{\"name\":\"Person0\",\"age\":0},{\"name\":\"Person1\",\"age\":1}]}"; + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree(outputStream.toByteArray()); + assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + assertEquals(jsonNode.toString(),expectedString); + } + + private static Event convertToEvent(Map data) { + return JacksonLog.builder().withData(data).build(); + } + + private static List> generateRecords(int numberOfRecords) { + + List> recordList = new ArrayList<>(); + + for (int rows = 0; rows < numberOfRecords; rows++) { + + Map eventData = new HashMap<>(); + + eventData.put("name", "Person" + rows); + eventData.put("age", rows); + recordList.add(eventData); + + } + + return recordList; + } + + + private Object getValue(JsonNode jsonNode) { + if(jsonNode.isTextual()) + return jsonNode.asText(); + + if(jsonNode.isInt()) + return jsonNode.asInt(); + + throw new RuntimeException("Test not setup correctly."); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java new file mode 100644 index 0000000000..53bd0a4edf --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/config/ThresholdOptionsTest.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.lambda.config; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class ThresholdOptionsTest { + private static final String DEFAULT_BYTE_CAPACITY = "6mb"; + private static final int DEFAULT_EVENT_COUNT = 0; + + @Test + void test_default_byte_capacity_test() { + assertThat(new ThresholdOptions().getMaximumSize().getBytes(), + equalTo(ByteCount.parse(DEFAULT_BYTE_CAPACITY).getBytes())); + } + + @Test + void test_get_event_collection_duration_test() { + assertThat(new ThresholdOptions().getEventCollectTimeOut(), equalTo(null)); + } + + @Test + void test_get_event_count_test() { + assertThat(new ThresholdOptions().getEventCount(), equalTo(DEFAULT_EVENT_COUNT)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java new file mode 100644 index 0000000000..17f39973b7 --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/lambda/dlq/DlqPushHandlerTest.java @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.lambda.dlq; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.dlq.DlqProvider; +import org.opensearch.dataprepper.plugins.dlq.DlqWriter; +import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions; +import software.amazon.awssdk.core.SdkBytes; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class DlqPushHandlerTest { + + private static final String BUCKET = "bucket"; + private static final String BUCKET_VALUE = "test"; + private static final String ROLE = "arn:aws:iam::524239988122:role/app-test"; + + private static final String REGION = "ap-south-1"; + private static final String S3_PLUGIN_NAME = "s3"; + private static final String KEY_PATH_PREFIX = "key_path_prefix"; + + private static final String KEY_PATH_PREFIX_VALUE = "dlq/"; + + private static final String PIPELINE_NAME = "log-pipeline"; + + private static final String DLQ_FILE = "local_dlq_file"; + + private PluginModel pluginModel; + + private DlqPushHandler dlqPushHandler; + private PluginFactory pluginFactory; + + private AwsAuthenticationOptions awsAuthenticationOptions; + + private DlqProvider dlqProvider; + + private DlqWriter dlqWriter; + + + @BeforeEach + public void setUp(){ + this.pluginFactory = mock(PluginFactory.class); + this.pluginModel = mock(PluginModel.class); + this.awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + this.dlqProvider = mock(DlqProvider.class); + this.dlqWriter = mock(DlqWriter.class); + } + + @Test + void perform_for_dlq_s3_success() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SdkBytes payload = SdkBytes.fromUtf8String("{\"name\":\"dataprepper\"}"); + LambdaSinkFailedDlqData failedDlqData = new LambdaSinkFailedDlqData(payload,"message",0); + dlqPushHandler = new DlqPushHandler(pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } +} diff --git a/data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..23c33feb6d --- /dev/null +++ b/data-prepper-plugins/lambda-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1,3 @@ +# To enable mocking of final classes with vanilla Mockito +# https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods +mock-maker-inline diff --git a/settings.gradle b/settings.gradle index 8edc547186..5f992a25f8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -150,3 +150,4 @@ include 'data-prepper-plugins:buffer-common' include 'data-prepper-plugins:dissect-processor' include 'data-prepper-plugins:dynamodb-source' include 'data-prepper-plugins:decompress-processor' +include 'data-prepper-plugins:lambda-sink'