From 5d28955b7b78cd19f84cbfc847aef8dd89de6eab Mon Sep 17 00:00:00 2001 From: Srikanth Govindarajan Date: Wed, 30 Oct 2024 15:16:22 -0700 Subject: [PATCH] Change response_cardinality config for lambda processor to response_events_match (#5132) * Change response_cardinality config for lambda processor to response_events_match Signed-off-by: Srikanth Govindarajan * Address comments Signed-off-by: Srikanth Govindarajan --------- Signed-off-by: Srikanth Govindarajan --- .../plugins/lambda/processor/LambdaProcessor.java | 7 +++---- .../plugins/lambda/processor/LambdaProcessorConfig.java | 8 ++++---- .../plugins/lambda/processor/ResponseCardinality.java | 1 + .../plugins/lambda/processor/LambdaProcessorTest.java | 9 +++++---- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java index 5965b0555a..b1e74ed096 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java @@ -70,7 +70,6 @@ public class LambdaProcessor extends AbstractProcessor, Record tagsOnMatchFailure; private final BatchOptions batchOptions; private final BufferFactory bufferFactory; @@ -100,7 +99,6 @@ public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pl this.responsePayloadMetric = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong()); functionName = lambdaProcessorConfig.getFunctionName(); - responseCardinality = lambdaProcessorConfig.getResponseCardinality(); whenCondition = lambdaProcessorConfig.getWhenCondition(); maxRetries = lambdaProcessorConfig.getMaxConnectionRetries(); batchOptions = lambdaProcessorConfig.getBatchOptions(); @@ -131,7 +129,7 @@ public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pl bufferFactory = new InMemoryBufferFactory(); // Select the correct strategy based on the configuration - if ((ResponseCardinality.STRICT).equals(lambdaProcessorConfig.getResponseCardinality())) { + if (lambdaProcessorConfig.getResponseEventsMatch()) { this.responseStrategy = new StrictResponseEventHandlingStrategy(); } else { this.responseStrategy = new AggregateResponseEventHandlingStrategy(); @@ -141,7 +139,8 @@ public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pl lambdaCommonHandler = new LambdaCommonHandler(LOG, lambdaAsyncClient, functionName, invocationType, bufferFactory); currentBufferPerBatch = lambdaCommonHandler.createBuffer(currentBufferPerBatch); - LOG.info("LambdaFunctionName:{} , responseCardinality:{}, invocationType:{}", functionName, responseCardinality.getValue(), invocationType); + LOG.info("LambdaFunctionName:{} , responseEventsMatch:{}, invocationType:{}", functionName, + lambdaProcessorConfig.getResponseEventsMatch(), invocationType); } @Override diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java index 7dfb4421ce..7c0bf52754 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java @@ -43,8 +43,8 @@ public class LambdaProcessorConfig { private InvocationType invocationType = InvocationType.REQUEST_RESPONSE; @JsonPropertyDescription("Defines the way Data Prepper treats the response from Lambda") - @JsonProperty("response_cardinality") - private ResponseCardinality responseCardinality = ResponseCardinality.STRICT; + @JsonProperty("response_events_match") + private boolean responseEventsMatch = false; @JsonPropertyDescription("sdk timeout defines the time sdk maintains the connection to the client before timing out") @JsonProperty("connection_timeout") @@ -99,7 +99,7 @@ public InvocationType getInvocationType() { return invocationType; } - public ResponseCardinality getResponseCardinality() { - return responseCardinality; + public Boolean getResponseEventsMatch() { + return responseEventsMatch; } } \ No newline at end of file diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/ResponseCardinality.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/ResponseCardinality.java index b90b59c8a0..bc7f13489e 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/ResponseCardinality.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/ResponseCardinality.java @@ -6,6 +6,7 @@ import java.util.HashMap; import java.util.Map; +@Deprecated public enum ResponseCardinality { STRICT("strict"), AGGREGATE("aggregate"); diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java index 81d8864440..d7cdc5148b 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java @@ -116,7 +116,7 @@ public void setUp() throws Exception { when(lambdaProcessorConfig.getFunctionName()).thenReturn("test-function"); when(lambdaProcessorConfig.getWhenCondition()).thenReturn(null); when(lambdaProcessorConfig.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE); - when(lambdaProcessorConfig.getResponseCardinality()).thenReturn(ResponseCardinality.STRICT); + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false); when(lambdaProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn("testRole"); @@ -205,6 +205,7 @@ public void testDoExecute_WithRecords_WhenConditionFalse() throws Exception { @Test public void testDoExecute_WithRecords_SuccessfulProcessing() throws Exception { // Arrange + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); Event event = mock(Event.class); Record record = new Record<>(event); Collection> records = Collections.singletonList(record); @@ -289,6 +290,7 @@ public void testHandleFailure() throws Exception { @Test public void testConvertLambdaResponseToEvent_WithEqualEventCounts_SuccessfulProcessing() throws Exception { // Arrange + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); setupTestObject(); populatePrivateFields(); List> resultRecords = new ArrayList<>(); @@ -351,7 +353,7 @@ public void testConvertLambdaResponseToEvent_WithEqualEventCounts_SuccessfulProc @Test public void testConvertLambdaResponseToEvent_WithUnequalEventCounts_SuccessfulProcessing() throws Exception { // Arrange - when(lambdaProcessorConfig.getResponseCardinality()).thenReturn(ResponseCardinality.AGGREGATE); + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false); setupTestObject(); populatePrivateFields(); List> resultRecords = new ArrayList<>(); @@ -362,7 +364,6 @@ public void testConvertLambdaResponseToEvent_WithUnequalEventCounts_SuccessfulPr when(invokeResponse.payload()).thenReturn(sdkBytes); when(invokeResponse.statusCode()).thenReturn(200); // Success status code when(lambdaCommonHandler.checkStatusCode(any())).thenReturn(true); - when(lambdaProcessorConfig.getResponseCardinality()).thenReturn(ResponseCardinality.AGGREGATE); // Mock the responseCodec.parse to add three events doAnswer(invocation -> { InputStream inputStream = (InputStream) invocation.getArgument(0); @@ -433,7 +434,7 @@ public void testConvertLambdaResponseToEvent_WithUnequalEventCounts_FailOn_STRIC when(invokeResponse.payload()).thenReturn(sdkBytes); when(invokeResponse.statusCode()).thenReturn(200); // Success status code when(lambdaCommonHandler.checkStatusCode(any())).thenReturn(true); - when(lambdaProcessorConfig.getResponseCardinality()).thenReturn(ResponseCardinality.STRICT); + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); // Mock the responseCodec.parse to add three events doAnswer(invocation -> {