diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java index 31996d0e4b..16bb4e985b 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java @@ -13,7 +13,6 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; -import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.configuration.PluginModel; @@ -76,18 +75,16 @@ public class LambdaProcessor extends AbstractProcessor, Record invokeFuture = CompletableFuture.completedFuture(invokeResponse); -// when(lambdaAsyncClientMock.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); -// -// // Mock LambdaCommonHandler -// LambdaCommonHandler lambdaCommonHandler = mock(LambdaCommonHandler.class); -// when(lambdaCommonHandler.checkStatusCode(any())).thenReturn(true); -// } -// -// private void populatePrivateFields() throws Exception { -// List tagsOnMatchFailure = Collections.singletonList("failure_tag"); -// // Use reflection to set the private fields -// setPrivateField(lambdaProcessor, "requestCodec", requestCodec); -// setPrivateField(lambdaProcessor, "responseCodec", responseCodec); -// setPrivateField(lambdaProcessor, "futureList", new ArrayList<>()); -// setPrivateField(lambdaProcessor, "numberOfRecordsSuccessCounter", numberOfRecordsSuccessCounter); -// setPrivateField(lambdaProcessor, "numberOfRecordsFailedCounter", numberOfRecordsFailedCounter); -// setPrivateField(lambdaProcessor, "tagsOnMatchFailure", tagsOnMatchFailure); -// setPrivateField(lambdaProcessor, "lambdaCommonHandler", lambdaCommonHandler); -// } -// -// // Helper method to set private fields via reflection -// private void setPrivateField(Object targetObject, String fieldName, Object value) throws Exception { -// Field field = targetObject.getClass().getDeclaredField(fieldName); -// field.setAccessible(true); -// field.set(targetObject, value); -// } -// -// private void setupTestObject() { -// // Create the LambdaProcessor instance -// lambdaProcessor = new LambdaProcessor(pluginFactory, pluginMetrics, lambdaProcessorConfig, awsCredentialsSupplier, expressionEvaluator); -// } -// -// @Test -// public void testDoExecute_WithExceptionDuringProcessing() throws Exception { -// // Arrange -// Event event = mock(Event.class); -// Record record = new Record<>(event); -// List> records = Collections.singletonList(record); -// -// // Mock Buffer -// Buffer bufferMock = mock(Buffer.class); -// when(lambdaProcessor.lambdaCommonHandler.createBuffer(any(BufferFactory.class))).thenReturn(bufferMock); -// when(bufferMock.getEventCount()).thenReturn(0, 1); -// when(bufferMock.getRecords()).thenReturn(records); -// doNothing().when(bufferMock).reset(); -// -// // Mock exception during flush -// when(bufferMock.flushToLambda(any())).thenThrow(new RuntimeException("Test exception")); -// -// // Act -// Collection> result = lambdaProcessor.doExecute(records); -// -//// // Wait for futures to complete -//// lambdaProcessor.lambdaCommonHandler.waitForFutures(lambdaProcessor.futureList); -// -// // Assert -// assertEquals(1, result.size()); -// verify(numberOfRecordsFailedCounter, times(1)).increment(1.0); -// } -// -// @Test -// public void testDoExecute_WithEmptyResponse() throws Exception { -// // Arrange -// Event event = mock(Event.class); -// Record record = new Record<>(event); -// List> records = Collections.singletonList(record); -// -// // Mock Buffer -// Buffer bufferMock = mock(Buffer.class); -// when(lambdaProcessor.lambdaCommonHandler.createBuffer(any(BufferFactory.class))).thenReturn(bufferMock); -// when(bufferMock.getEventCount()).thenReturn(0, 1); -// when(bufferMock.getRecords()).thenReturn(records); -// when(bufferMock.flushToLambda(any())).thenReturn(CompletableFuture.completedFuture(invokeResponse)); -// doNothing().when(bufferMock).reset(); -// -// when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("")); -// -// // Act -// Collection> result = lambdaProcessor.doExecute(records); -// -//// // Wait for futures to complete -//// lambdaProcessor.lambdaCommonHandler.waitForFutures(lambdaProcessor.futureList); -// -// // Assert -// assertEquals(0, result.size()); -// verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); -// } -// -// @Test -// public void testDoExecute_WithNullResponse() throws Exception { -// // Arrange -// Event event = mock(Event.class); -// Record record = new Record<>(event); -// List> records = Collections.singletonList(record); -// -// // Mock Buffer -// Buffer bufferMock = mock(Buffer.class); -// when(lambdaProcessor.lambdaCommonHandler.createBuffer(any(BufferFactory.class))).thenReturn(bufferMock); -// when(bufferMock.getEventCount()).thenReturn(0, 1); -// when(bufferMock.getRecords()).thenReturn(records); -// when(bufferMock.flushToLambda(any())).thenReturn(CompletableFuture.completedFuture(invokeResponse)); -// doNothing().when(bufferMock).reset(); -// -// when(invokeResponse.payload()).thenReturn(null); -// -// // Act -// Collection> result = lambdaProcessor.doExecute(records); -// -// // Wait for futures to complete -//// lambdaProcessor.lambdaCommonHandler.waitForFutures(lambdaProcessor.futureList); -// -// // Assert -// assertEquals(0, result.size()); -// verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); -// } -// -// @Test -// public void testDoExecute_WithEmptyRecords() { -// Collection> records = Collections.emptyList(); -// Collection> result = lambdaProcessor.doExecute(records); -// assertEquals(0, result.size()); -// } -// -// @Test -// public void testDoExecute_WhenConditionFalse() { -// Event event = mock(Event.class); -// Record record = new Record<>(event); -// Collection> records = Collections.singletonList(record); -// when(expressionEvaluator.evaluateConditional(anyString(), eq(event))).thenReturn(false); -// when(lambdaProcessorConfig.getWhenCondition()).thenReturn("some_condition"); -// setupTestObject(); -// -// Collection> result = lambdaProcessor.doExecute(records); -// -// assertEquals(1, result.size()); -// verify(bufferMock, never()).flushToLambda(anyString()); -// } -// -// @Test -// public void testDoExecute_SuccessfulProcessing() throws Exception { -// Event eventMock = mock(Event.class); -// Record record = new Record<>(eventMock); -// Collection> records = Collections.singletonList(record); -// -// when(bufferMock.getEventCount()).thenReturn(0).thenReturn(1); -// when(bufferMock.getRecords()).thenReturn(Collections.singletonList(record)); -// doNothing().when(bufferMock).reset(); -// -// // Initialize futureList -// setPrivateField(lambdaProcessor, "futureList", new ArrayList<>()); -// -// Collection> result = lambdaProcessor.doExecute(records); -// -// // Wait for futures to complete -//// lambdaProcessor.lambdaCommonHandler.waitForFutures(lambdaProcessor.futureList); -// -// assertEquals(1, result.size(), "Result should contain one record."); -// verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); -// verify(requestCodec, times(1)).writeEvent(eq(eventMock), any()); -// } -// -// @Test -// public void testHandleFailure() { -// Event event = mock(Event.class); -// Buffer bufferMock = mock(Buffer.class); -// List> records = List.of(new Record<>(event)); -// when(bufferMock.getEventCount()).thenReturn(1); -// -// lambdaProcessor.handleFailure(new RuntimeException("Test Exception"), bufferMock, records); -// -// verify(numberOfRecordsFailedCounter, times(1)).increment(1); -// } -// -// @Test -// public void testConvertLambdaResponseToEvent_WithEmptyPayload() throws Exception { -// Event event = mock(Event.class); -// Record record = new Record<>(event); -// List> records = Collections.singletonList(record); -// -// InMemoryBuffer bufferMock = mock(InMemoryBuffer.class); -// CompletableFuture mockedFuture = CompletableFuture.completedFuture(invokeResponse); -// when(bufferMock.flushToLambda(any())).thenReturn(mockedFuture); -// when(lambdaCommonHandler.checkStatusCode(invokeResponse)).thenReturn(true); -// when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("")); -// -// Collection> result = lambdaProcessor.doExecute(records); -// assertEquals(0, result.size()); -// verify(numberOfRecordsSuccessCounter, times(1)).increment(1.0); -// } -// -//// @Test -//// public void testConvertLambdaResponseToEvent_WithEqualEventCounts_SuccessfulProcessing() throws Exception { -//// // Arrange -//// when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(true); -//// -//// // Mock LambdaResponse with a valid payload -//// String payloadString = "[{\"key\":\"value1\"}, {\"key\":\"value2\"}]"; -//// SdkBytes sdkBytes = SdkBytes.fromByteArray(payloadString.getBytes()); -//// when(invokeResponse.payload()).thenReturn(sdkBytes); -//// when(invokeResponse.statusCode()).thenReturn(200); // Success status code -//// -//// // Mock the responseCodec.parse to add two events -//// doAnswer(invocation -> { -//// InputStream inputStream = invocation.getArgument(0); -//// @SuppressWarnings("unchecked") Consumer> consumer = invocation.getArgument(1); -//// Event parsedEvent1 = mock(Event.class); -//// EventMetadata parsedEventMetadata1 = mock(EventMetadata.class); -//// when(parsedEvent1.getMetadata()).thenReturn(parsedEventMetadata1); -//// -//// DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); -//// AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); -//// -//// when(parsedEvent1.getEventHandle()).thenReturn(eventHandle); -//// when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet); -//// -//// Event parsedEvent2 = mock(Event.class); -//// EventMetadata parsedEventMetadata2 = mock(EventMetadata.class); -//// when(parsedEvent2.getMetadata()).thenReturn(parsedEventMetadata2); -//// when(parsedEvent2.getEventHandle()).thenReturn(eventHandle); -//// -//// consumer.accept(new Record<>(parsedEvent1)); -//// consumer.accept(new Record<>(parsedEvent2)); -//// return null; -//// }).when(responseCodec).parse(any(InputStream.class), any(Consumer.class)); -//// -//// // Mock buffer with two original events -//// Event originalEvent1 = mock(Event.class); -//// EventMetadata originalEventMetadata1 = mock(EventMetadata.class); -//// when(originalEvent1.getMetadata()).thenReturn(originalEventMetadata1); -//// -//// Event originalEvent2 = mock(Event.class); -//// EventMetadata originalEventMetadata2 = mock(EventMetadata.class); -//// when(originalEvent2.getMetadata()).thenReturn(originalEventMetadata2); -//// -//// -//// DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); -//// AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); -//// when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet); -//// -//// when(originalEvent1.getEventHandle()).thenReturn(eventHandle); -//// when(originalEvent2.getEventHandle()).thenReturn(eventHandle); -//// -//// -//// List> originalRecords = Arrays.asList(new Record<>(originalEvent1), new Record<>(originalEvent2)); -//// -//// Buffer flushedBuffer = mock(Buffer.class); -//// when(flushedBuffer.getEventCount()).thenReturn(2); -//// when(flushedBuffer.getRecords()).thenReturn(originalRecords); -//// -//// List> resultRecords = new ArrayList<>(); -//// -//// // Act -//// lambdaProcessor.convertLambdaResponseToEvent(resultRecords, invokeResponse, flushedBuffer); -//// -//// // Assert -//// assertNotNull(resultRecords); -//// assertEquals(2, resultRecords.size(), "ResultRecords should contain two records"); -//// -//// } -//// -//// @Test -//// public void testConvertLambdaResponseToEvent_WithUnequalEventCounts_SuccessfulProcessing() throws Exception { -//// // Arrange -//// when(lambdaProcessorConfig.getResponseEventsMatch()).thenReturn(false); -//// -//// // Mock LambdaResponse with a valid payload containing three events -//// String payloadString = "[{\"key\":\"value1\"}, {\"key\":\"value2\"}, {\"key\":\"value3\"}]"; -//// SdkBytes sdkBytes = SdkBytes.fromByteArray(payloadString.getBytes()); -//// when(invokeResponse.payload()).thenReturn(sdkBytes); -//// when(invokeResponse.statusCode()).thenReturn(200); // Success status code -//// -//// // Mock the responseCodec.parse to add three events -//// doAnswer(invocation -> { -//// InputStream inputStream = invocation.getArgument(0); -//// Consumer> consumer = invocation.getArgument(1); -//// Event parsedEvent1 = mock(Event.class); -//// Event parsedEvent2 = mock(Event.class); -//// Event parsedEvent3 = mock(Event.class); -//// -//// consumer.accept(new Record<>(parsedEvent1)); -//// consumer.accept(new Record<>(parsedEvent2)); -//// consumer.accept(new Record<>(parsedEvent3)); -//// return null; -//// }).when(responseCodec).parse(any(InputStream.class), any(Consumer.class)); -//// -//// // Mock buffer with two original events -//// Event originalEvent1 = mock(Event.class); -//// Event originalEvent2 = mock(Event.class); -//// -//// DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); -//// AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); -//// when(eventHandle.getAcknowledgementSet()).thenReturn(acknowledgementSet); -//// -//// when(originalEvent1.getEventHandle()).thenReturn(eventHandle); -//// when(originalEvent2.getEventHandle()).thenReturn(eventHandle); -//// -//// List> originalRecords = Arrays.asList(new Record<>(originalEvent1), new Record<>(originalEvent2)); -//// -//// Buffer flushedBuffer = mock(Buffer.class); -//// when(flushedBuffer.getEventCount()).thenReturn(2); -//// when(flushedBuffer.getRecords()).thenReturn(originalRecords); -//// -//// List> resultRecords = new ArrayList<>(); -//// -//// // Act -//// lambdaProcessor.convertLambdaResponseToEvent(resultRecords, invokeResponse, flushedBuffer); -//// -//// // Assert -//// assertNotNull(resultRecords); -//// assertEquals(3, resultRecords.size(), "ResultRecords should contain three records"); -//// -//// // Verify that original events were not cleared -//// verify(originalEvent1, never()).clear(); -//// verify(originalEvent2, never()).clear(); -//// } -// -//} - - /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0