diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java index f891fdd5de..f858d569e9 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java @@ -142,7 +142,8 @@ public Collection> doExecute(Collection> records) { } recordsToLambda.add(record); } - resultRecords.addAll( + try { + resultRecords.addAll( lambdaCommonHandler.sendRecords(recordsToLambda, lambdaProcessorConfig, lambdaAsyncClient, (inputBuffer, response) -> { Duration latency = inputBuffer.stopLatencyWatch(); @@ -158,7 +159,12 @@ public Collection> doExecute(Collection> records) { numberOfRequestsFailedCounter.increment(); addFailureTags(inputBuffer, outputRecords); }) - ); + + ); + } catch (Exception e) { + numberOfRecordsFailedCounter.increment(recordsToLambda.size()); + resultRecords.addAll(recordsToLambda); + } return resultRecords; } diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java index 982bced5ab..6c60f6c7fb 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java @@ -9,10 +9,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED; import static org.opensearch.dataprepper.plugins.lambda.processor.LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS; @@ -21,11 +18,15 @@ import java.io.InputStream; import java.lang.reflect.Field; import java.time.Duration; +import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -45,12 +46,14 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.lambda.common.LambdaCommonHandler; import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig; import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType; import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions; @@ -62,319 +65,285 @@ @MockitoSettings(strictness = Strictness.LENIENT) public class LambdaProcessorTest { + // Mock dependencies + @Mock + private AwsAuthenticationOptions awsAuthenticationOptions; - // Mock dependencies - @Mock - private AwsAuthenticationOptions awsAuthenticationOptions; + @Mock + private Buffer bufferMock; - @Mock - private Buffer bufferMock; + @Mock + private PluginFactory pluginFactory; - @Mock - private PluginFactory pluginFactory; + @Mock + private PluginMetrics pluginMetrics; - @Mock - private PluginMetrics pluginMetrics; + @Mock + private LambdaProcessorConfig lambdaProcessorConfig; - @Mock - private LambdaProcessorConfig lambdaProcessorConfig; + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; - @Mock - private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock + private ExpressionEvaluator expressionEvaluator; - @Mock - private ExpressionEvaluator expressionEvaluator; + @Mock + private LambdaCommonHandler lambdaCommonHandler; - @Mock - private LambdaCommonHandler lambdaCommonHandler; + @Mock + private InputCodec responseCodec; - @Mock - private InputCodec responseCodec; + @Mock + private OutputCodec requestCodec; - @Mock - private OutputCodec requestCodec; + @Mock + private Counter numberOfRecordsSuccessCounter; - @Mock - private Counter numberOfRecordsSuccessCounter; + @Mock + private Counter numberOfRecordsFailedCounter; - @Mock - private Counter numberOfRecordsFailedCounter; + @Mock + private InvokeResponse invokeResponse; - @Mock - private InvokeResponse invokeResponse; + @Mock + private Timer lambdaLatencyMetric; - @Mock - private Timer lambdaLatencyMetric; + @Captor + private ArgumentCaptor>> consumerCaptor; - @Captor - private ArgumentCaptor>> consumerCaptor; + // The class under test + private LambdaProcessor lambdaProcessor; - // The class under test - private LambdaProcessor lambdaProcessor; + @BeforeEach + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this); - @BeforeEach - public void setUp() throws Exception { - MockitoAnnotations.openMocks(this); + // Mock PluginMetrics counters and timers + when(pluginMetrics.counter(eq(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS))).thenReturn( + numberOfRecordsSuccessCounter); + when(pluginMetrics.counter(eq(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED))).thenReturn( + numberOfRecordsFailedCounter); + when(pluginMetrics.timer(anyString())).thenReturn(lambdaLatencyMetric); + when(pluginMetrics.gauge(anyString(), any(AtomicLong.class))).thenAnswer( + invocation -> invocation.getArgument(1)); - // Mock PluginMetrics counters and timers - when(pluginMetrics.counter(eq(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS))).thenReturn( - numberOfRecordsSuccessCounter); - when(pluginMetrics.counter(eq(NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED))).thenReturn( - numberOfRecordsFailedCounter); - when(pluginMetrics.timer(anyString())).thenReturn(lambdaLatencyMetric); - when(pluginMetrics.gauge(anyString(), any(AtomicLong.class))).thenAnswer( - invocation -> invocation.getArgument(1)); + when(lambdaProcessorConfig.getFunctionName()).thenReturn("test-function"); + when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); + when(lambdaProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(lambdaProcessorConfig.getConnectionTimeout()).thenReturn(Duration.ofSeconds(5)); + when(lambdaProcessorConfig.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE); + BatchOptions batchOptions = mock(BatchOptions.class); + ThresholdOptions thresholdOptions = mock(ThresholdOptions.class); + when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions); + when(lambdaProcessorConfig.getWhenCondition()).thenReturn(null); + when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false); - when(lambdaProcessorConfig.getFunctionName()).thenReturn("test-function"); - when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3); - when(lambdaProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); - when(lambdaProcessorConfig.getConnectionTimeout()).thenReturn(Duration.ofSeconds(5)); - when(lambdaProcessorConfig.getInvocationType()).thenReturn(InvocationType.REQUEST_RESPONSE); - BatchOptions batchOptions = mock(BatchOptions.class); - ThresholdOptions thresholdOptions = mock(ThresholdOptions.class); - when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions); - when(lambdaProcessorConfig.getWhenCondition()).thenReturn(null); - when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false); + // Mock AWS Authentication Options + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn("testRole"); + + // Mock BatchOptions and ThresholdOptions + when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions); + when(thresholdOptions.getEventCount()).thenReturn(10); + when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("6mb")); + when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.ofSeconds(30)); + when(batchOptions.getKeyName()).thenReturn("key"); + + // Mock Response Codec Configuration + PluginModel responseCodecConfig = lambdaProcessorConfig.getResponseCodecConfig(); + PluginSetting responseCodecPluginSetting; + + if (responseCodecConfig == null) { + // Default to JsonInputCodec with default settings + responseCodecPluginSetting = new PluginSetting("json", Collections.emptyMap()); + } else { + responseCodecPluginSetting = new PluginSetting(responseCodecConfig.getPluginName(), + responseCodecConfig.getPluginSettings()); + } + + // Mock PluginFactory to return the mocked responseCodec + when(pluginFactory.loadPlugin(eq(InputCodec.class), any(PluginSetting.class))).thenReturn( + responseCodec); + + // Instantiate the LambdaProcessor manually + lambdaProcessor = new LambdaProcessor(pluginFactory, pluginMetrics, lambdaProcessorConfig, + awsCredentialsSupplier, expressionEvaluator); + + // Mock InvokeResponse + when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("[{\"key\":\"value\"}]")); + when(invokeResponse.statusCode()).thenReturn(200); // Success status code - // Mock AWS Authentication Options - when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); - when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn("testRole"); + // Mock LambdaAsyncClient inside LambdaProcessor + LambdaAsyncClient lambdaAsyncClientMock = mock(LambdaAsyncClient.class); + setPrivateField(lambdaProcessor, "lambdaAsyncClient", lambdaAsyncClientMock); - // Mock BatchOptions and ThresholdOptions - when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions); - when(thresholdOptions.getEventCount()).thenReturn(10); - when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("6mb")); - when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.ofSeconds(30)); - when(batchOptions.getKeyName()).thenReturn("key"); + // Mock the invoke method to return a completed future + CompletableFuture invokeFuture = CompletableFuture.completedFuture( + invokeResponse); + when(lambdaAsyncClientMock.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); - // Mock Response Codec Configuration - PluginModel responseCodecConfig = lambdaProcessorConfig.getResponseCodecConfig(); - PluginSetting responseCodecPluginSetting; + // Mock Response Codec parse method + doNothing().when(responseCodec).parse(any(InputStream.class), any(Consumer.class)); - if (responseCodecConfig == null) { - // Default to JsonInputCodec with default settings - responseCodecPluginSetting = new PluginSetting("json", Collections.emptyMap()); - } else { - responseCodecPluginSetting = new PluginSetting(responseCodecConfig.getPluginName(), - responseCodecConfig.getPluginSettings()); } - // Mock PluginFactory to return the mocked responseCodec - when(pluginFactory.loadPlugin(eq(InputCodec.class), any(PluginSetting.class))).thenReturn( - responseCodec); + private void populatePrivateFields() throws Exception { + List tagsOnMatchFailure = Collections.singletonList("failure_tag"); + // Use reflection to set the private fields + setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", + numberOfRecordsSuccessCounter); + setPrivateField(lambdaProcessor, "numberOfRecordsFailedCounter", numberOfRecordsFailedCounter); + setPrivateField(lambdaProcessor, "tagsOnMatchFailure", tagsOnMatchFailure); + setPrivateField(lambdaProcessor, "lambdaCommonHandler", lambdaCommonHandler); + } - // Instantiate the LambdaProcessor manually - lambdaProcessor = new LambdaProcessor(pluginFactory, pluginMetrics, lambdaProcessorConfig, - awsCredentialsSupplier, expressionEvaluator); + // Helper method to set private fields via reflection + private void setPrivateField(Object targetObject, String fieldName, Object value) + throws Exception { + Field field = targetObject.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(targetObject, value); + } - // Mock InvokeResponse - when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("[{\"key\":\"value\"}]")); - when(invokeResponse.statusCode()).thenReturn(200); // Success status code + @Test + public void testDoExecute_WithExceptionDuringProcessing() throws Exception { + // Arrange + Event event = mock(Event.class); + Record record = new Record<>(event); + List> records = Collections.singletonList(record); - // Mock LambdaAsyncClient inside LambdaProcessor - LambdaAsyncClient lambdaAsyncClientMock = mock(LambdaAsyncClient.class); - setPrivateField(lambdaProcessor, "lambdaAsyncClient", lambdaAsyncClientMock); + // make batch options null to generate exception + when(lambdaProcessorConfig.getBatchOptions()).thenReturn(null); + // Act + Collection> result = lambdaProcessor.doExecute(records); - // Mock the invoke method to return a completed future - CompletableFuture invokeFuture = CompletableFuture.completedFuture( - invokeResponse); - when(lambdaAsyncClientMock.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); + // Assert + assertEquals(1, result.size()); + verify(numberOfRecordsFailedCounter, times(1)).increment(1.0); + } - // Mock Response Codec parse method - doNothing().when(responseCodec).parse(any(InputStream.class), any(Consumer.class)); + @Test + public void testDoExecute_WithEmptyResponse() throws Exception { + // Arrange + Event event = mock(Event.class); + Record record = new Record<>(event); + List> records = Collections.singletonList(record); - } + // Mock Buffer to return empty payload + when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("")); - private void populatePrivateFields() throws Exception { - List tagsOnMatchFailure = Collections.singletonList("failure_tag"); - // Use reflection to set the private fields - setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", - numberOfRecordsSuccessCounter); - setPrivateField(lambdaProcessor, "numberOfRecordsFailedCounter", numberOfRecordsFailedCounter); - setPrivateField(lambdaProcessor, "tagsOnMatchFailure", tagsOnMatchFailure); - setPrivateField(lambdaProcessor, "lambdaCommonHandler", lambdaCommonHandler); - } + // Act + Collection> result = lambdaProcessor.doExecute(records); - // Helper method to set private fields via reflection - private void setPrivateField(Object targetObject, String fieldName, Object value) - throws Exception { - Field field = targetObject.getClass().getDeclaredField(fieldName); - field.setAccessible(true); - field.set(targetObject, value); - } + // Assert + assertEquals(0, result.size(), "Result should be empty due to empty Lambda response."); + //verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); + } - /* @Test - public void testDoExecute_WithExceptionDuringProcessing() throws Exception { + public void testDoExecute_WithNullResponse() throws Exception { // Arrange Event event = mock(Event.class); Record record = new Record<>(event); List> records = Collections.singletonList(record); - // Mock Buffer - Buffer bufferMock = mock(Buffer.class); - when(lambdaProcessor.lambdaCommonHandler.createBuffer(any(BufferFactory.class))).thenReturn(bufferMock); - when(bufferMock.getEventCount()).thenReturn(0, 1); - when(bufferMock.getRecords()).thenReturn(records); - doNothing().when(bufferMock).reset(); + // Mock Buffer to return null payload + when(invokeResponse.payload()).thenReturn(null); + + // Act + Collection> result = lambdaProcessor.doExecute(records); + + // Assert + assertEquals(0, result.size(), "Result should be empty due to null Lambda response."); + //verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); + } - // Mock exception during flush - when(bufferMock.flushToLambda(any())).thenThrow(new RuntimeException("Test exception")); + @Test + public void testDoExecute_WithEmptyRecords() { + // Arrange + Collection> records = Collections.emptyList(); // Act Collection> result = lambdaProcessor.doExecute(records); // Assert - assertEquals(1, result.size()); - verify(numberOfRecordsFailedCounter, times(1)).increment(1.0); + assertEquals(0, result.size(), "Result should be empty when input records are empty."); + verify(numberOfRecordsSuccessCounter, never()).increment(anyDouble()); + verify(numberOfRecordsFailedCounter, never()).increment(anyDouble()); } - */ - - @Test - public void testDoExecute_WithEmptyResponse() throws Exception { - // Arrange - Event event = mock(Event.class); - Record record = new Record<>(event); - List> records = Collections.singletonList(record); - - // Mock Buffer to return empty payload - when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("")); - - // Act - Collection> result = lambdaProcessor.doExecute(records); - - // Assert - assertEquals(0, result.size(), "Result should be empty due to empty Lambda response."); - //verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); - } - - @Test - public void testDoExecute_WithNullResponse() throws Exception { - // Arrange - Event event = mock(Event.class); - Record record = new Record<>(event); - List> records = Collections.singletonList(record); - - // Mock Buffer to return null payload - when(invokeResponse.payload()).thenReturn(null); - - // Act - Collection> result = lambdaProcessor.doExecute(records); - - // Assert - assertEquals(0, result.size(), "Result should be empty due to null Lambda response."); - //verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); - } - - @Test - public void testDoExecute_WithEmptyRecords() { - // Arrange - Collection> records = Collections.emptyList(); - - // Act - Collection> result = lambdaProcessor.doExecute(records); - - // Assert - assertEquals(0, result.size(), "Result should be empty when input records are empty."); - //verify(numberOfRecordsSuccessCounter, never()).increment(anyDouble()); - //verify(numberOfRecordsFailedCounter, never()).increment(anyDouble()); - } - - @Test - public void testDoExecute_WhenConditionFalse() { - // Arrange - Event event = mock(Event.class); - DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); - AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); - when(event.getEventHandle()).thenReturn(eventHandle); - when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet); - Record record = new Record<>(event); - Collection> records = Collections.singletonList(record); - - // Mock condition evaluator to return false - when(expressionEvaluator.evaluateConditional(anyString(), eq(event))).thenReturn(false); - when(lambdaProcessorConfig.getWhenCondition()).thenReturn("some_condition"); - - // Instantiate the LambdaProcessor manually - lambdaProcessor = new LambdaProcessor(pluginFactory, pluginMetrics, lambdaProcessorConfig, - awsCredentialsSupplier, expressionEvaluator); - - // Act - Collection> result = lambdaProcessor.doExecute(records); - - // Assert - assertEquals(1, result.size(), "Result should contain one record as the condition is false."); - //verify(numberOfRecordsSuccessCounter, never()).increment(anyDouble()); - //verify(numberOfRecordsFailedCounter, never()).increment(anyDouble()); - } - - @Test - public void testDoExecute_SuccessfulProcessing() throws Exception { - // Arrange - Event event = mock(Event.class); - DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); - AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); - when(event.getEventHandle()).thenReturn(eventHandle); - when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet); - Record record = new Record<>(event); - Collection> records = Collections.singletonList(record); - - // Mock LambdaAsyncClient inside LambdaProcessor - LambdaAsyncClient lambdaAsyncClientMock = mock(LambdaAsyncClient.class); - setPrivateField(lambdaProcessor, "lambdaAsyncClient", lambdaAsyncClientMock); - - // Mock the invoke method to return a completed future - CompletableFuture invokeFuture = CompletableFuture.completedFuture( - invokeResponse); - when(lambdaAsyncClientMock.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); - - // Mock Buffer behavior - when(bufferMock.getEventCount()).thenReturn(0).thenReturn(1).thenReturn(0); - when(bufferMock.getRecords()).thenReturn(Collections.singletonList(record)); - - doAnswer(invocation -> { - InputStream inputStream = invocation.getArgument(0); - @SuppressWarnings("unchecked") - Consumer> consumer = invocation.getArgument(1); - - // Simulate parsing by providing a mocked event - Event parsedEvent = mock(Event.class); - Record parsedRecord = new Record<>(parsedEvent); - consumer.accept(parsedRecord); - - return null; - }).when(responseCodec).parse(any(InputStream.class), any(Consumer.class)); - - // Act - Collection> result = lambdaProcessor.doExecute(records); - - // Assert - assertEquals(1, result.size(), "Result should contain one record."); - //verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); - } - - ; - - - /* + @Test - public void testHandleFailure() { + public void testDoExecute_WhenConditionFalse() { // Arrange Event event = mock(Event.class); - Buffer bufferMock = mock(Buffer.class); - List> records = List.of(new Record<>(event)); - when(bufferMock.getEventCount()).thenReturn(1); - when(bufferMock.getRecords()).thenReturn(records); + DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); + AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); + when(event.getEventHandle()).thenReturn(eventHandle); + when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet); + Record record = new Record<>(event); + Collection> records = Collections.singletonList(record); + + // Mock condition evaluator to return false + when(expressionEvaluator.evaluateConditional(anyString(), eq(event))).thenReturn(false); + when(lambdaProcessorConfig.getWhenCondition()).thenReturn("some_condition"); + + // Instantiate the LambdaProcessor manually + lambdaProcessor = new LambdaProcessor(pluginFactory, pluginMetrics, lambdaProcessorConfig, + awsCredentialsSupplier, expressionEvaluator); // Act - lambdaProcessor.handleFailure(new RuntimeException("Test Exception"), bufferMock, records); + Collection> result = lambdaProcessor.doExecute(records); // Assert - verify(numberOfRecordsFailedCounter, times(1)).increment(1.0); - // Ensure failure tags are added; assuming addFailureTags is implemented correctly - // You might need to verify interactions with event metadata if it's mocked + assertEquals(1, result.size(), "Result should contain one record as the condition is false."); + verify(numberOfRecordsSuccessCounter, never()).increment(anyDouble()); + verify(numberOfRecordsFailedCounter, never()).increment(anyDouble()); } + @Test + public void testDoExecute_SuccessfulProcessing() throws Exception { + // Arrange + Event event = mock(Event.class); + DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); + AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); + when(event.getEventHandle()).thenReturn(eventHandle); + when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet); + Record record = new Record<>(event); + Collection> records = Collections.singletonList(record); + + // Mock LambdaAsyncClient inside LambdaProcessor + LambdaAsyncClient lambdaAsyncClientMock = mock(LambdaAsyncClient.class); + setPrivateField(lambdaProcessor, "lambdaAsyncClient", lambdaAsyncClientMock); + + // Mock the invoke method to return a completed future + CompletableFuture invokeFuture = CompletableFuture.completedFuture( + invokeResponse); + when(lambdaAsyncClientMock.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); + + // Mock Buffer behavior + when(bufferMock.getEventCount()).thenReturn(0).thenReturn(1).thenReturn(0); + when(bufferMock.getRecords()).thenReturn(Collections.singletonList(record)); + + doAnswer(invocation -> { + InputStream inputStream = invocation.getArgument(0); + @SuppressWarnings("unchecked") + Consumer> consumer = invocation.getArgument(1); + + // Simulate parsing by providing a mocked event + Event parsedEvent = mock(Event.class); + Record parsedRecord = new Record<>(parsedEvent); + consumer.accept(parsedRecord); + + return null; + }).when(responseCodec).parse(any(InputStream.class), any(Consumer.class)); + + // Act + Collection> result = lambdaProcessor.doExecute(records); + + // Assert + assertEquals(1, result.size(), "Result should contain one record."); + verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); + } @Test public void testConvertLambdaResponseToEvent_WithEqualEventCounts_SuccessfulProcessing() throws Exception { @@ -415,8 +384,7 @@ public void testConvertLambdaResponseToEvent_WithEqualEventCounts_SuccessfulProc when(bufferMock.getEventCount()).thenReturn(2); // Act - List> resultRecords = new ArrayList<>(); - lambdaProcessor.convertLambdaResponseToEvent(resultRecords, invokeResponse, bufferMock, responseCodec); + List> resultRecords = lambdaProcessor.convertLambdaResponseToEvent(bufferMock, invokeResponse); // Assert assertEquals(2, resultRecords.size(), "ResultRecords should contain two records."); @@ -477,14 +445,10 @@ public void testConvertLambdaResponseToEvent_WithUnequalEventCounts_SuccessfulPr when(bufferMock.getEventCount()).thenReturn(2); // Act - List> resultRecords = new ArrayList<>(); - lambdaProcessor.convertLambdaResponseToEvent(resultRecords, invokeResponse, bufferMock, responseCodec); - + List> resultRecords = lambdaProcessor.convertLambdaResponseToEvent(bufferMock, invokeResponse); // Assert // Verify that three records are added to the result assertEquals(3, resultRecords.size(), "ResultRecords should contain three records."); } - */ - }