Skip to content

Commit

Permalink
Change response_cardinality config for lambda processor to response_e…
Browse files Browse the repository at this point in the history
…vents_match (opensearch-project#5132)

* Change response_cardinality config for lambda processor to response_events_match

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Address comments

Signed-off-by: Srikanth Govindarajan <[email protected]>

---------

Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg authored Oct 30, 2024
1 parent c5c2fa6 commit 5d28955
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Eve
private final Counter numberOfRecordsFailedCounter;
private final Timer lambdaLatencyMetric;
private final String invocationType;
private final ResponseCardinality responseCardinality;
private final List<String> tagsOnMatchFailure;
private final BatchOptions batchOptions;
private final BufferFactory bufferFactory;
Expand Down Expand Up @@ -100,7 +99,6 @@ public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pl
this.responsePayloadMetric = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong());

functionName = lambdaProcessorConfig.getFunctionName();
responseCardinality = lambdaProcessorConfig.getResponseCardinality();
whenCondition = lambdaProcessorConfig.getWhenCondition();
maxRetries = lambdaProcessorConfig.getMaxConnectionRetries();
batchOptions = lambdaProcessorConfig.getBatchOptions();
Expand Down Expand Up @@ -131,7 +129,7 @@ public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pl
bufferFactory = new InMemoryBufferFactory();

// Select the correct strategy based on the configuration
if ((ResponseCardinality.STRICT).equals(lambdaProcessorConfig.getResponseCardinality())) {
if (lambdaProcessorConfig.getResponseEventsMatch()) {
this.responseStrategy = new StrictResponseEventHandlingStrategy();
} else {
this.responseStrategy = new AggregateResponseEventHandlingStrategy();
Expand All @@ -141,7 +139,8 @@ public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pl
lambdaCommonHandler = new LambdaCommonHandler(LOG, lambdaAsyncClient, functionName, invocationType, bufferFactory);
currentBufferPerBatch = lambdaCommonHandler.createBuffer(currentBufferPerBatch);

LOG.info("LambdaFunctionName:{} , responseCardinality:{}, invocationType:{}", functionName, responseCardinality.getValue(), invocationType);
LOG.info("LambdaFunctionName:{} , responseEventsMatch:{}, invocationType:{}", functionName,
lambdaProcessorConfig.getResponseEventsMatch(), invocationType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class LambdaProcessorConfig {
private InvocationType invocationType = InvocationType.REQUEST_RESPONSE;

@JsonPropertyDescription("Defines the way Data Prepper treats the response from Lambda")
@JsonProperty("response_cardinality")
private ResponseCardinality responseCardinality = ResponseCardinality.STRICT;
@JsonProperty("response_events_match")
private boolean responseEventsMatch = false;

@JsonPropertyDescription("sdk timeout defines the time sdk maintains the connection to the client before timing out")
@JsonProperty("connection_timeout")
Expand Down Expand Up @@ -99,7 +99,7 @@ public InvocationType getInvocationType() {
return invocationType;
}

public ResponseCardinality getResponseCardinality() {
return responseCardinality;
public Boolean getResponseEventsMatch() {
return responseEventsMatch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.HashMap;
import java.util.Map;

@Deprecated
public enum ResponseCardinality {
STRICT("strict"),
AGGREGATE("aggregate");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void setUp() throws Exception {
when(lambdaProcessorConfig.getFunctionName()).thenReturn("test-function");
when(lambdaProcessorConfig.getWhenCondition()).thenReturn(null);
when(lambdaProcessorConfig.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE);
when(lambdaProcessorConfig.getResponseCardinality()).thenReturn(ResponseCardinality.STRICT);
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false);
when(lambdaProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);
when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn("testRole");
Expand Down Expand Up @@ -205,6 +205,7 @@ public void testDoExecute_WithRecords_WhenConditionFalse() throws Exception {
@Test
public void testDoExecute_WithRecords_SuccessfulProcessing() throws Exception {
// Arrange
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true);
Event event = mock(Event.class);
Record<Event> record = new Record<>(event);
Collection<Record<Event>> records = Collections.singletonList(record);
Expand Down Expand Up @@ -289,6 +290,7 @@ public void testHandleFailure() throws Exception {
@Test
public void testConvertLambdaResponseToEvent_WithEqualEventCounts_SuccessfulProcessing() throws Exception {
// Arrange
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true);
setupTestObject();
populatePrivateFields();
List<Record<Event>> resultRecords = new ArrayList<>();
Expand Down Expand Up @@ -351,7 +353,7 @@ public void testConvertLambdaResponseToEvent_WithEqualEventCounts_SuccessfulProc
@Test
public void testConvertLambdaResponseToEvent_WithUnequalEventCounts_SuccessfulProcessing() throws Exception {
// Arrange
when(lambdaProcessorConfig.getResponseCardinality()).thenReturn(ResponseCardinality.AGGREGATE);
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false);
setupTestObject();
populatePrivateFields();
List<Record<Event>> resultRecords = new ArrayList<>();
Expand All @@ -362,7 +364,6 @@ public void testConvertLambdaResponseToEvent_WithUnequalEventCounts_SuccessfulPr
when(invokeResponse.payload()).thenReturn(sdkBytes);
when(invokeResponse.statusCode()).thenReturn(200); // Success status code
when(lambdaCommonHandler.checkStatusCode(any())).thenReturn(true);
when(lambdaProcessorConfig.getResponseCardinality()).thenReturn(ResponseCardinality.AGGREGATE);
// Mock the responseCodec.parse to add three events
doAnswer(invocation -> {
InputStream inputStream = (InputStream) invocation.getArgument(0);
Expand Down Expand Up @@ -433,7 +434,7 @@ public void testConvertLambdaResponseToEvent_WithUnequalEventCounts_FailOn_STRIC
when(invokeResponse.payload()).thenReturn(sdkBytes);
when(invokeResponse.statusCode()).thenReturn(200); // Success status code
when(lambdaCommonHandler.checkStatusCode(any())).thenReturn(true);
when(lambdaProcessorConfig.getResponseCardinality()).thenReturn(ResponseCardinality.STRICT);
when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true);

// Mock the responseCodec.parse to add three events
doAnswer(invocation -> {
Expand Down

0 comments on commit 5d28955

Please sign in to comment.