diff --git a/data-prepper-plugins/aws-lambda/build.gradle b/data-prepper-plugins/aws-lambda/build.gradle index 6186ba05a4..4ad8209198 100644 --- a/data-prepper-plugins/aws-lambda/build.gradle +++ b/data-prepper-plugins/aws-lambda/build.gradle @@ -62,13 +62,18 @@ task integrationTest(type: Test) { classpath = sourceSets.integrationTest.runtimeClasspath systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties' - systemProperty 'tests.lambda.sink.region', System.getProperty('tests.lambda.sink.region') - systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName') - systemProperty 'tests.lambda.sink.sts_role_arn', System.getProperty('tests.lambda.sink.sts_role_arn') - systemProperty 'tests.lambda.processor.region', System.getProperty('tests.lambda.processor.region') - systemProperty 'tests.lambda.processor.functionName', System.getProperty('tests.lambda.processor.functionName') - systemProperty 'tests.lambda.processor.sts_role_arn', System.getProperty('tests.lambda.processor.sts_role_arn') + //Enable Multi-thread in tests + systemProperty 'junit.jupiter.execution.parallel.enabled', 'true' + systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent' + + systemProperty 'tests.lambda.sink.region', System.getenv('TESTS_LAMBDA_SINK_REGION') + systemProperty 'tests.lambda.sink.functionName', System.getenv('TESTS_LAMBDA_SINK_FUNCTION_NAME') + systemProperty 'tests.lambda.sink.sts_role_arn', System.getenv('TESTS_LAMBDA_SINK_STS_ROLE_ARN') + + systemProperty 'tests.lambda.processor.region', System.getenv('TESTS_LAMBDA_PROCESSOR_REGION') + systemProperty 'tests.lambda.processor.functionName', System.getenv('TESTS_LAMBDA_PROCESSOR_FUNCTION_NAME') + systemProperty 'tests.lambda.processor.sts_role_arn', System.getenv('TESTS_LAMBDA_PROCESSOR_STS_ROLE_ARN') filter { includeTestsMatching '*IT' diff --git a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java new file mode 100644 index 0000000000..94b0be3c30 --- /dev/null +++ b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorIT.java @@ -0,0 +1,326 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.lambda.processor; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.event.DefaultEventMetadata; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.types.ByteCount; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; +import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType; +import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodec; +import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodecConfig; +import static org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig.DEFAULT_CONNECTION_TIMEOUT; + +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.lenient; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class LambdaProcessorIT { + private AwsCredentialsProvider awsCredentialsProvider; + private LambdaProcessor lambdaProcessor; + private LambdaProcessorConfig lambdaProcessorConfig; + private String functionName; + private String lambdaRegion; + private String role; + + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock + private PluginFactory pluginFactory; + @Mock + private PluginMetrics pluginMetrics; + @Mock + private ExpressionEvaluator expressionEvaluator; + @Mock + private Counter testCounter; + @Mock + private Timer testTimer; + @Mock + InvocationType invocationType; + + private LambdaProcessor createObjectUnderTest(LambdaProcessorConfig processorConfig) { + return new LambdaProcessor(pluginFactory, pluginMetrics, processorConfig, awsCredentialsSupplier, expressionEvaluator); + } + + @BeforeEach + public void setup() { +// lambdaRegion = System.getProperty("tests.lambda.processor.region"); +// functionName = System.getProperty("tests.lambda.processor.functionName"); +// role = System.getProperty("tests.lambda.processor.sts_role_arn"); + lambdaRegion="us-east-1"; + functionName="test-lambda-processor"; + role="arn:aws:iam::176893235612:role/osis-lambda-role"; + + pluginMetrics = mock(PluginMetrics.class); + when(pluginMetrics.gauge(any(), any(AtomicLong.class))).thenReturn(new AtomicLong()); + testCounter = mock(Counter.class); + try { + lenient().doAnswer(args -> { + return null; + }).when(testCounter).increment(any(Double.class)); + } catch (Exception e){} + try { + lenient().doAnswer(args -> { + return null; + }).when(testTimer).record(any(Long.class), any(TimeUnit.class)); + } catch (Exception e){} + when(pluginMetrics.counter(any())).thenReturn(testCounter); + testTimer = mock(Timer.class); + when(pluginMetrics.timer(any())).thenReturn(testTimer); + lambdaProcessorConfig = mock(LambdaProcessorConfig.class); + expressionEvaluator = mock(ExpressionEvaluator.class); + awsCredentialsProvider = DefaultCredentialsProvider.create(); + when(awsCredentialsSupplier.getProvider(any())).thenReturn(awsCredentialsProvider); + pluginFactory = mock(PluginFactory.class); + JsonInputCodecConfig jsonInputCodecConfig = mock(JsonInputCodecConfig.class); + when(jsonInputCodecConfig.getKeyName()).thenReturn(null); + when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(null); + when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(null); + InputCodec responseCodec = new JsonInputCodec(jsonInputCodecConfig); + when(pluginFactory.loadPlugin(eq(InputCodec.class), any(PluginSetting.class))).thenReturn(responseCodec); + + when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName); + when(lambdaProcessorConfig.getWhenCondition()).thenReturn(null); + when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); + BatchOptions batchOptions = mock(BatchOptions.class); + when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions); + when(lambdaProcessorConfig.getTagsOnFailure()).thenReturn(null); + invocationType = mock(InvocationType.class); + when(lambdaProcessorConfig.getInvocationType()).thenReturn(invocationType); + when(lambdaProcessorConfig.getResponseCodecConfig()).thenReturn(null); + when(lambdaProcessorConfig.getConnectionTimeout()).thenReturn(DEFAULT_CONNECTION_TIMEOUT); + ThresholdOptions thresholdOptions = mock(ThresholdOptions.class); + when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions); + when(batchOptions.getKeyName()).thenReturn("osi_key"); + when(thresholdOptions.getEventCount()).thenReturn(ThresholdOptions.DEFAULT_EVENT_COUNT); + when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse(ThresholdOptions.DEFAULT_BYTE_CAPACITY)); + when(thresholdOptions.getEventCollectTimeOut()).thenReturn(ThresholdOptions.DEFAULT_EVENT_TIMEOUT); + AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(lambdaRegion)); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(role); + when(awsAuthenticationOptions.getAwsStsExternalId()).thenReturn(null); + when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(null); + when(lambdaProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + + } + + @ParameterizedTest + //@ValueSource(ints = {2, 5, 10, 100, 1000}) + @ValueSource(ints = {1000}) + public void testRequestResponseWithMatchingEventsStrictMode(int numRecords) { + when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue()); + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); + lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig); + List> records = createRecords(numRecords); + + Collection> results = lambdaProcessor.doExecute(records); + + assertThat(results.size(), equalTo(numRecords)); + validateStrictModeResults(results); + } + + + @ParameterizedTest + //@ValueSource(ints = {2, 5, 10, 100, 1000}) + @ValueSource(ints = {1000}) + public void testRequestResponseWithMatchingEventsAggregateMode(int numRecords) { + when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue()); + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false); + lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig); + List> records = createRecords(numRecords); + + Collection> results = lambdaProcessor.doExecute(records); + + assertThat(results.size(), equalTo(numRecords)); + validateResultsForAggregateMode(results ); + } + + @ParameterizedTest + @ValueSource(ints = {1000}) + public void testRequestResponse_WithMatchingEvents_StrictMode_WithMultipleThreads(int numRecords) throws InterruptedException { + when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue()); + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); + lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig); + + int numThreads = 5; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + CountDownLatch latch = new CountDownLatch(numThreads); + + List> records = createRecords(numRecords); + + for (int i = 0; i < numThreads; i++) { + executorService.submit(() -> { + try { + Collection> results = lambdaProcessor.doExecute(records); + assertThat(results.size(), equalTo(numRecords)); + validateStrictModeResults(results); + } finally { + latch.countDown(); + } + }); + } + + latch.await(5, TimeUnit.MINUTES); + executorService.shutdown(); + } + + @ParameterizedTest + @ValueSource(strings = {"RequestResponse", "Event"}) + public void testDifferentInvocationTypes(String invocationType) { + when(this.invocationType.getAwsLambdaValue()).thenReturn(invocationType); + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); + + lambdaProcessor = createObjectUnderTest(lambdaProcessorConfig); + List> records = createRecords(10); + + Collection> results = lambdaProcessor.doExecute(records); + + if (invocationType.equals("RequestResponse")) { + assertThat(results.size(), equalTo(10)); + validateStrictModeResults(results); + } else { + // For "Event" invocation type + assertThat(results.size(), equalTo(0)); + } + } + + @Test + public void testWithFailureTags() { + when(invocationType.getAwsLambdaValue()).thenReturn(InvocationType.REQUEST_RESPONSE.getAwsLambdaValue()); + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false); + when(lambdaProcessorConfig.getTagsOnFailure()).thenReturn(Collections.singletonList("lambda_failure")); + + LambdaProcessor spyLambdaProcessor = spy(createObjectUnderTest(lambdaProcessorConfig)); + + doThrow(new RuntimeException("Simulated Lambda failure")) + .when(spyLambdaProcessor).convertLambdaResponseToEvent(any(Buffer.class), any(InvokeResponse.class)); + + List> records = createRecords(5); + + Collection> results = spyLambdaProcessor.doExecute(records); + + assertThat(results.size(), equalTo(5)); + for (Record record : results) { + assertThat(record.getData().getMetadata().getTags().contains("lambda_failure"), equalTo(true)); + } + } + + private void validateResultsForAggregateMode(Collection> results) { + List> resultRecords = new ArrayList<>(results); + for (int i = 0; i < resultRecords.size(); i++) { + Event event = resultRecords.get(i).getData(); + Map eventData = event.toMap(); + + // Check if the event contains the expected data + assertThat(eventData.containsKey("id"), equalTo(true)); + int id = (Integer) eventData.get("id"); + assertThat(eventData.get("key" + id), equalTo(id)); + String stringValue = "value" + id; + assertThat(eventData.get("keys" + id), equalTo(stringValue.toUpperCase())); + + // Check that there's no metadata or it's empty + EventMetadata metadata = event.getMetadata(); + if (metadata != null) { + assertThat(metadata.getAttributes().isEmpty(), equalTo(true)); + assertThat(metadata.getTags().isEmpty(), equalTo(true)); + } + } + } + + private void validateStrictModeResults(Collection> results) { + List> resultRecords = new ArrayList<>(results); + for (int i = 0; i < resultRecords.size(); i++) { + Map eventData = resultRecords.get(i).getData().toMap(); + Map attr = resultRecords.get(i).getData().getMetadata().getAttributes(); + int id = (Integer)eventData.get("id"); + assertThat(eventData.get("key"+id), equalTo(id)); + String stringValue = "value"+id; + assertThat(eventData.get("keys"+id), equalTo(stringValue.toUpperCase())); + assertThat(attr.get("attr"+id), equalTo(id)); + assertThat(attr.get("attrs"+id), equalTo("attrvalue"+id)); + } + } + + private List> createRecords(int numRecords) { + List> records = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Map map = new HashMap<>(); + map.put("id", i); + map.put("key"+i, i); + map.put("keys"+i, "value"+i); + Map attrs = new HashMap<>(); + attrs.put("attr"+i, i); + attrs.put("attrs"+i, "attrvalue"+i); + EventMetadata metadata = DefaultEventMetadata.builder() + .withEventType("event") + .withAttributes(attrs) + .build(); + final Event event = JacksonEvent.builder() + .withData(map) + .withEventType("event") + .withEventMetadata(metadata) + .build(); + records.add(new Record<>(event)); + } + return records; + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java index a5f0155828..978861afe4 100644 --- a/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java +++ b/data-prepper-plugins/aws-lambda/src/integrationTest/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorServiceIT.java @@ -1,164 +1,165 @@ -package org.opensearch.dataprepper.plugins.lambda.processor; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.when; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; -import io.micrometer.core.instrument.Counter; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.log.JacksonLog; -import org.opensearch.dataprepper.model.plugin.PluginFactory; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; -import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; -import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType; -import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.lambda.LambdaAsyncClient; - -@ExtendWith(MockitoExtension.class) -public class LambdaProcessorServiceIT { - - private final ObjectMapper objectMapper = new ObjectMapper( - new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); - private LambdaAsyncClient lambdaAsyncClient; - private String functionName; - private String lambdaRegion; - private String role; - @Mock - private LambdaProcessorConfig lambdaProcessorConfig; - @Mock - private BatchOptions batchOptions; - @Mock - private ThresholdOptions thresholdOptions; - @Mock - private AwsAuthenticationOptions awsAuthenticationOptions; - @Mock - private AwsCredentialsSupplier awsCredentialsSupplier; - @Mock - private PluginMetrics pluginMetrics; - @Mock - private PluginFactory pluginFactory; - @Mock - private PluginSetting pluginSetting; - @Mock - private Counter numberOfRecordsSuccessCounter; - @Mock - private Counter numberOfRecordsFailedCounter; - @Mock - private ExpressionEvaluator expressionEvaluator; - - private static Record createRecord() { - final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build(); - return new Record<>(event); - } - - private static Collection> generateRecords(int numberOfRecords) { - List> recordList = new ArrayList<>(); - - for (int rows = 1; rows <= numberOfRecords; rows++) { - HashMap eventData = new HashMap<>(); - eventData.put("name", "Person" + rows); - eventData.put("age", Integer.toString(rows)); - - Record eventRecord = new Record<>( - JacksonEvent.builder().withData(eventData).withEventType("event").build()); - recordList.add(eventRecord); - } - return recordList; - } - - @BeforeEach - public void setUp() throws Exception { - MockitoAnnotations.openMocks(this); - lambdaRegion = System.getProperty("tests.lambda.processor.region"); - functionName = System.getProperty("tests.lambda.processor.functionName"); - role = System.getProperty("tests.lambda.processor.sts_role_arn"); - - final Region region = Region.of(lambdaRegion); - - lambdaAsyncClient = LambdaAsyncClient.builder() - .region(Region.of(lambdaRegion)) - .build(); - - when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)). - thenReturn(numberOfRecordsSuccessCounter); - when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)). - thenReturn(numberOfRecordsFailedCounter); - } - - public LambdaProcessor createObjectUnderTest(final String config) throws JsonProcessingException { - - final LambdaProcessorConfig lambdaProcessorConfig = objectMapper.readValue(config, - LambdaProcessorConfig.class); - return new LambdaProcessor(pluginFactory, pluginMetrics, lambdaProcessorConfig, - awsCredentialsSupplier, expressionEvaluator); - } - - public LambdaProcessor createObjectUnderTest(LambdaProcessorConfig lambdaSinkConfig) - throws JsonProcessingException { - return new LambdaProcessor(pluginFactory, pluginMetrics, lambdaSinkConfig, - awsCredentialsSupplier, expressionEvaluator); - } - - @ParameterizedTest - @ValueSource(ints = {1, 3}) - void verify_records_to_lambda_success(final int recordCount) throws Exception { - - when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName); - when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); - when(lambdaProcessorConfig.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE); - - LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig); - - Collection> recordsData = generateRecords(recordCount); - List> recordsResult = (List>) objectUnderTest.doExecute( - recordsData); - Thread.sleep(Duration.ofSeconds(10).toMillis()); - - assertEquals(recordsResult.size(), recordCount); - } - - @ParameterizedTest - @ValueSource(ints = {1, 3}) - void verify_records_with_batching_to_lambda(final int recordCount) - throws JsonProcessingException, InterruptedException { - - when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName); - when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); - when(lambdaProcessorConfig.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE); - when(thresholdOptions.getEventCount()).thenReturn(1); - when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb")); - when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s")); - when(batchOptions.getKeyName()).thenReturn("lambda_batch_key"); - when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions); - when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions); - - LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig); - Collection> records = generateRecords(recordCount); - Collection> recordsResult = objectUnderTest.doExecute(records); - Thread.sleep(Duration.ofSeconds(10).toMillis()); - assertEquals(recordsResult.size(), recordCount); - } -} \ No newline at end of file +//package org.opensearch.dataprepper.plugins.lambda.processor; +// +//import com.fasterxml.jackson.core.JsonProcessingException; +//import com.fasterxml.jackson.databind.ObjectMapper; +//import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +//import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +//import io.micrometer.core.instrument.Counter; +//import static org.junit.jupiter.api.Assertions.assertEquals; +//import org.junit.jupiter.api.BeforeEach; +//import org.junit.jupiter.api.extension.ExtendWith; +//import org.junit.jupiter.params.ParameterizedTest; +//import org.junit.jupiter.params.provider.ValueSource; +//import org.mockito.Mock; +//import static org.mockito.Mockito.when; +//import org.mockito.MockitoAnnotations; +//import org.mockito.junit.jupiter.MockitoExtension; +//import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +//import org.opensearch.dataprepper.expression.ExpressionEvaluator; +//import org.opensearch.dataprepper.metrics.PluginMetrics; +//import org.opensearch.dataprepper.model.configuration.PluginSetting; +//import org.opensearch.dataprepper.model.event.Event; +//import org.opensearch.dataprepper.model.event.JacksonEvent; +//import org.opensearch.dataprepper.model.log.JacksonLog; +//import org.opensearch.dataprepper.model.plugin.PluginFactory; +//import org.opensearch.dataprepper.model.record.Record; +//import org.opensearch.dataprepper.model.types.ByteCount; +//import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory; +//import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory; +//import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +//import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; +//import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType; +//import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; +//import software.amazon.awssdk.regions.Region; +//import software.amazon.awssdk.services.lambda.LambdaAsyncClient; +// +//import java.time.Duration; +//import java.util.ArrayList; +//import java.util.Collection; +//import java.util.HashMap; +//import java.util.List; +// +//@ExtendWith(MockitoExtension.class) +//public class LambdaProcessorServiceIT { +// +// private LambdaAsyncClient lambdaAsyncClient; +// private String functionName; +// private String lambdaRegion; +// private String role; +// private BufferFactory bufferFactory; +// @Mock +// private LambdaProcessorConfig lambdaProcessorConfig; +// @Mock +// private BatchOptions batchOptions; +// @Mock +// private ThresholdOptions thresholdOptions; +// @Mock +// private AwsAuthenticationOptions awsAuthenticationOptions; +// @Mock +// private AwsCredentialsSupplier awsCredentialsSupplier; +// @Mock +// private PluginMetrics pluginMetrics; +// @Mock +// private PluginFactory pluginFactory; +// @Mock +// private PluginSetting pluginSetting; +// @Mock +// private Counter numberOfRecordsSuccessCounter; +// @Mock +// private Counter numberOfRecordsFailedCounter; +// @Mock +// private ExpressionEvaluator expressionEvaluator; +// +// private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); +// +// +// @BeforeEach +// public void setUp() throws Exception { +// MockitoAnnotations.openMocks(this); +// lambdaRegion = System.getProperty("tests.lambda.processor.region"); +// functionName = System.getProperty("tests.lambda.processor.functionName"); +// role = System.getProperty("tests.lambda.processor.sts_role_arn"); +// +// final Region region = Region.of(lambdaRegion); +// +// lambdaAsyncClient = LambdaAsyncClient.builder() +// .region(Region.of(lambdaRegion)) +// .build(); +// +// bufferFactory = new InMemoryBufferFactory(); +// +// when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)). +// thenReturn(numberOfRecordsSuccessCounter); +// when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)). +// thenReturn(numberOfRecordsFailedCounter); +// } +// +// +// private static Record createRecord() { +// final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build(); +// return new Record<>(event); +// } +// +// public LambdaProcessor createObjectUnderTest(final String config) throws JsonProcessingException { +// +// final LambdaProcessorConfig lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class); +// return new LambdaProcessor(pluginFactory,pluginMetrics,lambdaProcessorConfig,awsCredentialsSupplier,expressionEvaluator); +// } +// +// public LambdaProcessor createObjectUnderTest(LambdaProcessorConfig lambdaSinkConfig) throws JsonProcessingException { +// return new LambdaProcessor(pluginFactory,pluginMetrics,lambdaSinkConfig,awsCredentialsSupplier,expressionEvaluator); +// } +// +// +// private static Collection> generateRecords(int numberOfRecords) { +// List> recordList = new ArrayList<>(); +// +// for (int rows = 1; rows <= numberOfRecords; rows++) { +// HashMap eventData = new HashMap<>(); +// eventData.put("name", "Person" + rows); +// eventData.put("age", Integer.toString(rows)); +// +// Record eventRecord = new Record<>(JacksonEvent.builder().withData(eventData).withEventType("event").build()); +// recordList.add(eventRecord); +// } +// return recordList; +// } +// +// @ParameterizedTest +// @ValueSource(ints = {1,3}) +// void verify_records_to_lambda_success(final int recordCount) throws Exception { +// +// when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName); +// when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); +// when(lambdaProcessorConfig.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE); +// +// LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig); +// +// Collection> recordsData = generateRecords(recordCount); +// List> recordsResult = (List>) objectUnderTest.doExecute(recordsData); +// Thread.sleep(Duration.ofSeconds(10).toMillis()); +// +// assertEquals(recordsResult.size(),recordCount); +// } +// +// @ParameterizedTest +// @ValueSource(ints = {1,3}) +// void verify_records_with_batching_to_lambda(final int recordCount) throws JsonProcessingException, InterruptedException { +// +// when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName); +// when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); +// when(lambdaProcessorConfig.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE); +// when(thresholdOptions.getEventCount()).thenReturn(1); +// when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb")); +// when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s")); +// when(batchOptions.getKeyName()).thenReturn("lambda_batch_key"); +// when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions); +// when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions); +// +// LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig); +// Collection> records = generateRecords(recordCount); +// Collection> recordsResult = objectUnderTest.doExecute(records); +// Thread.sleep(Duration.ofSeconds(10).toMillis()); +// assertEquals(recordsResult.size(),recordCount); +// } +//} diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java index a5f638e414..84ae0255f8 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java @@ -45,189 +45,189 @@ @DataPrepperPlugin(name = "aws_lambda", pluginType = Processor.class, pluginConfigurationType = LambdaProcessorConfig.class) public class LambdaProcessor extends AbstractProcessor, Record> { - public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaProcessorObjectsEventsSucceeded"; - public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaProcessorObjectsEventsFailed"; - public static final String LAMBDA_LATENCY_METRIC = "lambdaProcessorLatency"; - public static final String REQUEST_PAYLOAD_SIZE = "requestPayloadSize"; - public static final String RESPONSE_PAYLOAD_SIZE = "responsePayloadSize"; - - private static final Logger LOG = LoggerFactory.getLogger(LambdaProcessor.class); - final PluginSetting codecPluginSetting; - final PluginFactory pluginFactory; - final LambdaProcessorConfig lambdaProcessorConfig; - private final String whenCondition; - private final ExpressionEvaluator expressionEvaluator; - private final Counter numberOfRecordsSuccessCounter; - private final Counter numberOfRecordsFailedCounter; - private final Timer lambdaLatencyMetric; - private final List tagsOnMatchFailure; - private final LambdaAsyncClient lambdaAsyncClient; - private final AtomicLong requestPayloadMetric; - private final AtomicLong responsePayloadMetric; - private final ResponseEventHandlingStrategy responseStrategy; - private final JsonOutputCodecConfig jsonOutputCodecConfig; - LambdaCommonHandler lambdaCommonHandler; - - @DataPrepperPluginConstructor - public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pluginMetrics, - final LambdaProcessorConfig lambdaProcessorConfig, - final AwsCredentialsSupplier awsCredentialsSupplier, - final ExpressionEvaluator expressionEvaluator) { - super(pluginMetrics); - this.expressionEvaluator = expressionEvaluator; - this.pluginFactory = pluginFactory; - this.lambdaProcessorConfig = lambdaProcessorConfig; - this.numberOfRecordsSuccessCounter = pluginMetrics.counter( - NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS); - this.numberOfRecordsFailedCounter = pluginMetrics.counter( - NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED); - this.lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC); - this.requestPayloadMetric = pluginMetrics.gauge(REQUEST_PAYLOAD_SIZE, new AtomicLong()); - this.responsePayloadMetric = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong()); - this.whenCondition = lambdaProcessorConfig.getWhenCondition(); - - tagsOnMatchFailure = lambdaProcessorConfig.getTagsOnMatchFailure(); - - PluginModel responseCodecConfig = lambdaProcessorConfig.getResponseCodecConfig(); - - if (responseCodecConfig == null) { - // Default to JsonInputCodec with default settings - codecPluginSetting = new PluginSetting("json", Collections.emptyMap()); - } else { - codecPluginSetting = new PluginSetting(responseCodecConfig.getPluginName(), - responseCodecConfig.getPluginSettings()); - } + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaProcessorObjectsEventsSucceeded"; + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaProcessorObjectsEventsFailed"; + public static final String LAMBDA_LATENCY_METRIC = "lambdaProcessorLatency"; + public static final String REQUEST_PAYLOAD_SIZE = "requestPayloadSize"; + public static final String RESPONSE_PAYLOAD_SIZE = "responsePayloadSize"; + + private static final Logger LOG = LoggerFactory.getLogger(LambdaProcessor.class); + final PluginSetting codecPluginSetting; + final PluginFactory pluginFactory; + final LambdaProcessorConfig lambdaProcessorConfig; + private final String whenCondition; + private final ExpressionEvaluator expressionEvaluator; + private final Counter numberOfRecordsSuccessCounter; + private final Counter numberOfRecordsFailedCounter; + private final Timer lambdaLatencyMetric; + private final List tagsOnMatchFailure; + private final LambdaAsyncClient lambdaAsyncClient; + private final AtomicLong requestPayloadMetric; + private final AtomicLong responsePayloadMetric; + private final ResponseEventHandlingStrategy responseStrategy; + private final JsonOutputCodecConfig jsonOutputCodecConfig; + LambdaCommonHandler lambdaCommonHandler; + + @DataPrepperPluginConstructor + public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pluginMetrics, + final LambdaProcessorConfig lambdaProcessorConfig, + final AwsCredentialsSupplier awsCredentialsSupplier, + final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.expressionEvaluator = expressionEvaluator; + this.pluginFactory = pluginFactory; + this.lambdaProcessorConfig = lambdaProcessorConfig; + this.numberOfRecordsSuccessCounter = pluginMetrics.counter( + NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS); + this.numberOfRecordsFailedCounter = pluginMetrics.counter( + NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED); + this.lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC); + this.requestPayloadMetric = pluginMetrics.gauge(REQUEST_PAYLOAD_SIZE, new AtomicLong()); + this.responsePayloadMetric = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong()); + this.whenCondition = lambdaProcessorConfig.getWhenCondition(); + + tagsOnMatchFailure = lambdaProcessorConfig.getTagsOnFailure(); + + PluginModel responseCodecConfig = lambdaProcessorConfig.getResponseCodecConfig(); + + if (responseCodecConfig == null) { + // Default to JsonInputCodec with default settings + codecPluginSetting = new PluginSetting("json", Collections.emptyMap()); + } else { + codecPluginSetting = new PluginSetting(responseCodecConfig.getPluginName(), + responseCodecConfig.getPluginSettings()); + } - jsonOutputCodecConfig = new JsonOutputCodecConfig(); - jsonOutputCodecConfig.setKeyName(lambdaProcessorConfig.getBatchOptions().getKeyName()); + jsonOutputCodecConfig = new JsonOutputCodecConfig(); + jsonOutputCodecConfig.setKeyName(lambdaProcessorConfig.getBatchOptions().getKeyName()); - lambdaAsyncClient = LambdaClientFactory.createAsyncLambdaClient( - lambdaProcessorConfig.getAwsAuthenticationOptions(), - lambdaProcessorConfig.getMaxConnectionRetries(), awsCredentialsSupplier, - lambdaProcessorConfig.getConnectionTimeout()); + lambdaAsyncClient = LambdaClientFactory.createAsyncLambdaClient( + lambdaProcessorConfig.getAwsAuthenticationOptions(), + lambdaProcessorConfig.getMaxConnectionRetries(), awsCredentialsSupplier, + lambdaProcessorConfig.getConnectionTimeout()); + + // Select the correct strategy based on the configuration + if (lambdaProcessorConfig.getResponseEventsMatch()) { + this.responseStrategy = new StrictResponseEventHandlingStrategy(); + } else { + this.responseStrategy = new AggregateResponseEventHandlingStrategy(); + } - // Select the correct strategy based on the configuration - if (lambdaProcessorConfig.getResponseEventsMatch()) { - this.responseStrategy = new StrictResponseEventHandlingStrategy(); - } else { - this.responseStrategy = new AggregateResponseEventHandlingStrategy(); } - } + @Override + public Collection> doExecute(Collection> records) { + if (records.isEmpty()) { + return records; + } - @Override - public Collection> doExecute(Collection> records) { - if (records.isEmpty()) { - return records; + List> resultRecords = Collections.synchronizedList(new ArrayList()); + List> recordsToLambda = new ArrayList<>(); + for (Record record : records) { + final Event event = record.getData(); + // If the condition is false, add the event to resultRecords as-is + if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { + resultRecords.add(record); + continue; + } + recordsToLambda.add(record); + } + resultRecords.addAll( + lambdaCommonHandler.sendRecords(recordsToLambda, lambdaProcessorConfig, lambdaAsyncClient, + (inputBuffer, response) -> convertLambdaResponseToEvent(inputBuffer, response), + (inputBuffer, outputRecords) -> { + addFailureTags(inputBuffer, outputRecords); + }) + ); + return resultRecords; } - List> resultRecords = Collections.synchronizedList(new ArrayList()); - List> recordsToLambda = new ArrayList<>(); - for (Record record : records) { - final Event event = record.getData(); - // If the condition is false, add the event to resultRecords as-is - if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { - resultRecords.add(record); - continue; - } - recordsToLambda.add(record); - } - resultRecords.addAll( - lambdaCommonHandler.sendRecords(recordsToLambda, lambdaProcessorConfig, lambdaAsyncClient, - (inputBuffer, response) -> convertLambdaResponseToEvent(inputBuffer, response), - (inputBuffer, outputRecords) -> { - addFailureTags(inputBuffer, outputRecords); - }) - ); - return resultRecords; - } - - /* - * Assumption: Lambda always returns json array. - * 1. If response has an array, we assume that we split the individual events. - * 2. If it is not an array, then create one event per response. - */ - List> convertLambdaResponseToEvent(Buffer flushedBuffer, - final InvokeResponse lambdaResponse) { - InputCodec responseCodec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSetting); - List> originalRecords = flushedBuffer.getRecords(); - try { - List parsedEvents = new ArrayList<>(); - - List> resultRecords = new ArrayList<>(); - SdkBytes payload = lambdaResponse.payload(); - // Handle null or empty payload - if (payload == null || payload.asByteArray() == null || payload.asByteArray().length == 0) { - LOG.warn(NOISY, "Lambda response payload is null or empty, dropping the original events"); - // Set metrics - //requestPayloadMetric.set(flushedBuffer.getPayloadRequestSize()); - //responsePayloadMetric.set(0); - } else { - // Set metrics - //requestPayloadMetric.set(flushedBuffer.getPayloadRequestSize()); - //responsePayloadMetric.set(payload.asByteArray().length); - - LOG.debug("Response payload:{}", payload.asUtf8String()); - InputStream inputStream = new ByteArrayInputStream(payload.asByteArray()); - //Convert to response codec + /* + * Assumption: Lambda always returns json array. + * 1. If response has an array, we assume that we split the individual events. + * 2. If it is not an array, then create one event per response. + */ + List> convertLambdaResponseToEvent(Buffer flushedBuffer, + final InvokeResponse lambdaResponse) { + InputCodec responseCodec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSetting); + List> originalRecords = flushedBuffer.getRecords(); try { - responseCodec.parse(inputStream, record -> { - Event event = record.getData(); - parsedEvents.add(event); - }); - } catch (IOException ex) { - throw new RuntimeException(ex); + List parsedEvents = new ArrayList<>(); + + List> resultRecords = new ArrayList<>(); + SdkBytes payload = lambdaResponse.payload(); + // Handle null or empty payload + if (payload == null || payload.asByteArray() == null || payload.asByteArray().length == 0) { + LOG.warn(NOISY, "Lambda response payload is null or empty, dropping the original events"); + // Set metrics + //requestPayloadMetric.set(flushedBuffer.getPayloadRequestSize()); + //responsePayloadMetric.set(0); + } else { + // Set metrics + //requestPayloadMetric.set(flushedBuffer.getPayloadRequestSize()); + //responsePayloadMetric.set(payload.asByteArray().length); + + LOG.debug("Response payload:{}", payload.asUtf8String()); + InputStream inputStream = new ByteArrayInputStream(payload.asByteArray()); + //Convert to response codec + try { + responseCodec.parse(inputStream, record -> { + Event event = record.getData(); + parsedEvents.add(event); + }); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + + LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, " + + "FlushedBuffer size:{}", parsedEvents.size(), flushedBuffer.getEventCount(), + flushedBuffer.getSize()); + responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, flushedBuffer); + + } + return resultRecords; + } catch (Exception e) { + LOG.error(NOISY, "Error converting Lambda response to Event"); + // Metrics update + //requestPayloadMetric.set(flushedBuffer.getPayloadRequestSize()); + //responsePayloadMetric.set(0); + addFailureTags(flushedBuffer, originalRecords); + return originalRecords; + //????? handleFailure(e, flushedBuffer, resultRecords, failureHandler); } - - LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, " + - "FlushedBuffer size:{}", parsedEvents.size(), flushedBuffer.getEventCount(), - flushedBuffer.getSize()); - responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, flushedBuffer); - - } - return resultRecords; - } catch (Exception e) { - LOG.error(NOISY, "Error converting Lambda response to Event"); - // Metrics update - //requestPayloadMetric.set(flushedBuffer.getPayloadRequestSize()); - //responsePayloadMetric.set(0); - addFailureTags(flushedBuffer, originalRecords); - return originalRecords; - //????? handleFailure(e, flushedBuffer, resultRecords, failureHandler); } - } - - /* - * If one event in the Buffer fails, we consider that the entire - * Batch fails and tag each event in that Batch. - */ - - private void addFailureTags(Buffer flushedBuffer, List> resultRecords) { - // Add failure tags to each event in the batch - for (Record record : flushedBuffer.getRecords()) { - Event event = record.getData(); - EventMetadata metadata = event.getMetadata(); - if (metadata != null) { - metadata.addTags(tagsOnMatchFailure); - } else { - LOG.warn("Event metadata is null, cannot add failure tags."); - } - resultRecords.add(record); + + /* + * If one event in the Buffer fails, we consider that the entire + * Batch fails and tag each event in that Batch. + */ + + private void addFailureTags(Buffer flushedBuffer, List> resultRecords) { + // Add failure tags to each event in the batch + for (Record record : flushedBuffer.getRecords()) { + Event event = record.getData(); + EventMetadata metadata = event.getMetadata(); + if (metadata != null) { + metadata.addTags(tagsOnMatchFailure); + } else { + LOG.warn("Event metadata is null, cannot add failure tags."); + } + resultRecords.add(record); + } } - } - @Override - public void prepareForShutdown() { - } + @Override + public void prepareForShutdown() { + } - @Override - public boolean isReadyForShutdown() { - return true; - } + @Override + public boolean isReadyForShutdown() { + return true; + } - @Override - public void shutdown() { - } + @Override + public void shutdown() { + } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java index d88f396388..21ae307d17 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java @@ -22,14 +22,14 @@ public class LambdaProcessorConfig extends LambdaCommonConfig { @JsonProperty("lambda_when") private String whenCondition; - @JsonProperty("tags_on_match_failure") + @JsonProperty("tags_on_failure") @JsonPropertyDescription("A List of Strings that specifies the tags to be set in the event when lambda fails to " + "or exception occurs. This tag may be used in conditional expressions in " + "other parts of the configuration") - private List tagsOnMatchFailure = Collections.emptyList(); + private List tagsOnFailure = Collections.emptyList(); - public List getTagsOnMatchFailure(){ - return tagsOnMatchFailure; + public List getTagsOnFailure(){ + return tagsOnFailure; } public String getWhenCondition() { diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java index c6d2202ccb..d38000cdb2 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfigTest.java @@ -26,7 +26,7 @@ public class LambdaProcessorConfigTest { @Test void test_defaults() { final LambdaProcessorConfig lambdaProcessorConfig = new LambdaProcessorConfig(); - assertThat(lambdaProcessorConfig.getTagsOnMatchFailure(), equalTo(List.of())); + assertThat(lambdaProcessorConfig.getTagsOnFailure(), equalTo(List.of())); assertThat(lambdaProcessorConfig.getWhenCondition(), equalTo(null)); assertThat(lambdaProcessorConfig.getResponseEventsMatch(), equalTo(false)); }