Skip to content

Commit

Permalink
Merge branch 'lambda-fixes1' of github.com:kkondaka/kk-data-prepper-f…
Browse files Browse the repository at this point in the history
…2 into lambda-fixes1
  • Loading branch information
san81 committed Nov 17, 2024
2 parents ee445d5 + 16e9f5b commit 46033fa
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,6 @@
package org.opensearch.dataprepper.plugins.lambda.processor;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -36,17 +25,33 @@
import org.opensearch.dataprepper.plugins.lambda.common.ResponseEventHandlingStrategy;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.client.LambdaClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name = "aws_lambda", pluginType = Processor.class, pluginConfigurationType = LambdaProcessorConfig.class)
public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaProcessorObjectsEventsSucceeded";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaProcessorObjectsEventsFailed";
public static final String NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA = "lambdaProcessorNumberOfRequestsSucceeded";
public static final String NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA = "lambdaProcessorNumberOfRequestsFailed";
public static final String LAMBDA_LATENCY_METRIC = "lambdaProcessorLatency";
public static final String REQUEST_PAYLOAD_SIZE = "requestPayloadSize";
public static final String RESPONSE_PAYLOAD_SIZE = "responsePayloadSize";
Expand All @@ -59,11 +64,13 @@ public class LambdaProcessor extends AbstractProcessor<Record<Event>, Record<Eve
private final ExpressionEvaluator expressionEvaluator;
private final Counter numberOfRecordsSuccessCounter;
private final Counter numberOfRecordsFailedCounter;
private final Counter numberOfRequestsSuccessCounter;
private final Counter numberOfRequestsFailedCounter;
private final Timer lambdaLatencyMetric;
private final List<String> tagsOnMatchFailure;
private final LambdaAsyncClient lambdaAsyncClient;
private final AtomicLong requestPayloadMetric;
private final AtomicLong responsePayloadMetric;
private final DistributionSummary requestPayloadMetric;
private final DistributionSummary responsePayloadMetric;
private final ResponseEventHandlingStrategy responseStrategy;
private final JsonOutputCodecConfig jsonOutputCodecConfig;
LambdaCommonHandler lambdaCommonHandler;
Expand All @@ -78,15 +85,18 @@ public LambdaProcessor(final PluginFactory pluginFactory, final PluginMetrics pl
this.pluginFactory = pluginFactory;
this.lambdaProcessorConfig = lambdaProcessorConfig;
this.numberOfRecordsSuccessCounter = pluginMetrics.counter(
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS);
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS);
this.numberOfRecordsFailedCounter = pluginMetrics.counter(
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED);
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED);
this.numberOfRequestsSuccessCounter = pluginMetrics.counter(
NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA);
this.numberOfRequestsFailedCounter = pluginMetrics.counter(
NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA);
this.lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC);
this.requestPayloadMetric = pluginMetrics.gauge(REQUEST_PAYLOAD_SIZE, new AtomicLong());
this.responsePayloadMetric = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong());
this.requestPayloadMetric = pluginMetrics.summary(REQUEST_PAYLOAD_SIZE);
this.responsePayloadMetric = pluginMetrics.summary(RESPONSE_PAYLOAD_SIZE);
this.whenCondition = lambdaProcessorConfig.getWhenCondition();

tagsOnMatchFailure = lambdaProcessorConfig.getTagsOnFailure();
this.tagsOnMatchFailure = lambdaProcessorConfig.getTagsOnFailure();

PluginModel responseCodecConfig = lambdaProcessorConfig.getResponseCodecConfig();

Expand Down Expand Up @@ -134,8 +144,18 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
}
resultRecords.addAll(
lambdaCommonHandler.sendRecords(recordsToLambda, lambdaProcessorConfig, lambdaAsyncClient,
(inputBuffer, response) -> convertLambdaResponseToEvent(inputBuffer, response),
(inputBuffer, response) -> {
Duration latency = inputBuffer.stopLatencyWatch();
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount());
numberOfRecordsSuccessCounter.increment();
return convertLambdaResponseToEvent(inputBuffer, response);
},
(inputBuffer, outputRecords) -> {
Duration latency = inputBuffer.stopLatencyWatch();
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
numberOfRecordsFailedCounter.increment(inputBuffer.getEventCount());
numberOfRequestsFailedCounter.increment();
addFailureTags(inputBuffer, outputRecords);
})
);
Expand All @@ -159,15 +179,7 @@ List<Record<Event>> convertLambdaResponseToEvent(Buffer flushedBuffer,
// Handle null or empty payload
if (payload == null || payload.asByteArray() == null || payload.asByteArray().length == 0) {
LOG.warn(NOISY, "Lambda response payload is null or empty, dropping the original events");
// Set metrics
//requestPayloadMetric.set(flushedBuffer.getPayloadRequestSize());
//responsePayloadMetric.set(0);
} else {
// Set metrics
//requestPayloadMetric.set(flushedBuffer.getPayloadRequestSize());
//responsePayloadMetric.set(payload.asByteArray().length);

LOG.debug("Response payload:{}", payload.asUtf8String());
InputStream inputStream = new ByteArrayInputStream(payload.asByteArray());
//Convert to response codec
try {
Expand All @@ -188,12 +200,8 @@ List<Record<Event>> convertLambdaResponseToEvent(Buffer flushedBuffer,
return resultRecords;
} catch (Exception e) {
LOG.error(NOISY, "Error converting Lambda response to Event");
// Metrics update
//requestPayloadMetric.set(flushedBuffer.getPayloadRequestSize());
//responsePayloadMetric.set(0);
addFailureTags(flushedBuffer, originalRecords);
return originalRecords;
//????? handleFailure(e, flushedBuffer, resultRecords, failureHandler);
}
}

Expand Down Expand Up @@ -230,4 +238,4 @@ public boolean isReadyForShutdown() {
public void shutdown() {
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
package org.opensearch.dataprepper.plugins.lambda.sink;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.TimeUnit;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand Down Expand Up @@ -37,23 +40,27 @@
@DataPrepperPlugin(name = "aws_lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class)
public class LambdaSink extends AbstractSink<Record<Event>> {

public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaProcessorObjectsEventsSucceeded";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaProcessorObjectsEventsFailed";
public static final String LAMBDA_LATENCY_METRIC = "lambdaProcessorLatency";
public static final String REQUEST_PAYLOAD_SIZE = "requestPayloadSize";
public static final String RESPONSE_PAYLOAD_SIZE = "responsePayloadSize";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaSinkObjectsEventsSucceeded";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaSinkObjectsEventsFailed";
public static final String NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA = "lambdaSinkNumberOfRequestsSucceeded";
public static final String NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA = "lambdaSinkNumberOfRequestsFailed";
public static final String LAMBDA_LATENCY_METRIC = "lambdaSinkLatency";
public static final String REQUEST_PAYLOAD_SIZE = "lambdaSinkRequestPayloadSize";
public static final String RESPONSE_PAYLOAD_SIZE = "lambdaSinkResponsePayloadSize";

private static final Logger LOG = LoggerFactory.getLogger(LambdaSink.class);
private static final String BUCKET = "bucket";
private static final String KEY_PATH = "key_path_prefix";
private final Counter numberOfRecordsSuccessCounter;
private final Counter numberOfRecordsFailedCounter;
private final Counter numberOfRequestsSuccessCounter;
private final Counter numberOfRequestsFailedCounter;
private final LambdaSinkConfig lambdaSinkConfig;
private final ExpressionEvaluator expressionEvaluator;
private final LambdaAsyncClient lambdaAsyncClient;
private final AtomicLong responsePayloadMetric;
private final DistributionSummary responsePayloadMetric;
private final Timer lambdaLatencyMetric;
private final AtomicLong requestPayloadMetric;
private final DistributionSummary requestPayloadMetric;
private final PluginSetting pluginSetting;
private volatile boolean sinkInitialized;
private DlqPushHandler dlqPushHandler = null;
Expand All @@ -72,13 +79,18 @@ public LambdaSink(final PluginSetting pluginSetting,
this.lambdaSinkConfig = lambdaSinkConfig;
this.expressionEvaluator = expressionEvaluator;
OutputCodecContext outputCodecContext = OutputCodecContext.fromSinkContext(sinkContext);
this.responsePayloadMetric = pluginMetrics.gauge(RESPONSE_PAYLOAD_SIZE, new AtomicLong());

this.numberOfRecordsSuccessCounter = pluginMetrics.counter(
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS);
this.numberOfRecordsFailedCounter = pluginMetrics.counter(
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED);
this.numberOfRequestsSuccessCounter = pluginMetrics.counter(
NUMBER_OF_SUCCESSFUL_REQUESTS_TO_LAMBDA);
this.numberOfRequestsFailedCounter = pluginMetrics.counter(
NUMBER_OF_FAILED_REQUESTS_TO_LAMBDA);
this.lambdaLatencyMetric = pluginMetrics.timer(LAMBDA_LATENCY_METRIC);
this.requestPayloadMetric = pluginMetrics.gauge(REQUEST_PAYLOAD_SIZE, new AtomicLong());
this.requestPayloadMetric = pluginMetrics.summary(REQUEST_PAYLOAD_SIZE);
this.responsePayloadMetric = pluginMetrics.summary(RESPONSE_PAYLOAD_SIZE);
this.lambdaAsyncClient = LambdaClientFactory.createAsyncLambdaClient(
lambdaSinkConfig.getAwsAuthenticationOptions(),
lambdaSinkConfig.getMaxConnectionRetries(),
Expand Down Expand Up @@ -134,10 +146,18 @@ public void doOutput(final Collection<Record<Event>> records) {
lambdaSinkConfig,
lambdaAsyncClient,
(inputBuffer, invokeResponse) -> {
Duration latency = inputBuffer.stopLatencyWatch();
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount());
numberOfRequestsSuccessCounter.increment();
releaseEventHandlesPerBatch(true, inputBuffer);
return null;
},
(inputBuffer, invokeResponse) -> {
Duration latency = inputBuffer.stopLatencyWatch();
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS);
numberOfRecordsFailedCounter.increment(inputBuffer.getEventCount());
numberOfRequestsFailedCounter.increment();
handleFailure(new RuntimeException("failed"), inputBuffer);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ public void setUp() throws Exception {
lambdaProcessor = new LambdaProcessor(pluginFactory, pluginMetrics, lambdaProcessorConfig,
awsCredentialsSupplier, expressionEvaluator);

// Inject mocks into the LambdaProcessor using reflection
populatePrivateFields();

// Mock InvokeResponse
when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("[{\"key\":\"value\"}]"));
when(invokeResponse.statusCode()).thenReturn(200); // Success status code
Expand All @@ -183,9 +180,6 @@ public void setUp() throws Exception {
invokeResponse);
when(lambdaAsyncClientMock.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture);

// Mock the checkStatusCode method
when(lambdaCommonHandler.checkStatusCode(any())).thenReturn(true);

// Mock Response Codec parse method
doNothing().when(responseCodec).parse(any(InputStream.class), any(Consumer.class));

Expand Down

0 comments on commit 46033fa

Please sign in to comment.