Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lambda sink refactor #4766

Merged
merged 2 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions data-prepper-plugins/aws-lambda/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@

# Lambda Processor
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove processor doc?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the developer guide for running integ tests here. We can remove the usage sections after they are added to Opensearch documentation website.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to have 2 different readme for lambda processor and sink?


This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.

## Usage
```aidl
lambda-pipeline:
...
processor:
- aws_lambda:
aws:
region: "us-east-1"
sts_role_arn: "<arn>"
function_name: "uploadToS3Lambda"
max_retries: 3
mode: "synchronous"
batch:
batch_key: "osi_key"
threshold:
event_count: 3
maximum_size: 6mb
event_collect_timeout: 15s
```

## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.
The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.processor.lambda.region="us-east-1" -Dtests.processor.lambda.functionName="lambda_test_function" -Dtests.processor.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role

```


# Lambda Sink

This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.

## Usage
```aidl
lambda-pipeline:
...
sink:
- aws_lambda:
aws:
region: "us-east-1"
sts_role_arn: "<arn>"
function_name: "uploadToS3Lambda"
max_retries: 3
batch:
batch_key: "osi_key"
threshold:
event_count: 3
maximum_size: 6mb
event_collect_timeout: 15s
dlq:
s3:
bucket: test-bucket
key_path_prefix: dlq/
```

## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.
The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.sink.lambda.region="us-east-1" -Dtests.sink.lambda.functionName="lambda_test_function" -Dtests.sink.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role

```
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ dependencies {
testAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-plugins:parse-json-processor')
testImplementation testLibs.slf4j.simple
testImplementation 'org.mockito:mockito-core:4.6.1'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.2'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.2'
}

test {
Expand Down Expand Up @@ -59,9 +61,13 @@ task integrationTest(type: Test) {
classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'
systemProperty 'tests.sink.lambda.region', System.getProperty('tests.sink.lambda.region')
systemProperty 'tests.sink.lambda.functionName', System.getProperty('tests.sink.lambda.functionName')
systemProperty 'tests.sink.lambda.sts_role_arn', System.getProperty('tests.sink.lambda.sts_role_arn')
systemProperty 'tests.lambda.sink.region', System.getProperty('tests.lambda.sink.region')
systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName')
systemProperty 'tests.lambda.sink.sts_role_arn', System.getProperty('tests.lambda.sink.sts_role_arn')

systemProperty 'tests.lambda.processor.region', System.getProperty('tests.lambda.processor.region')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove processor properies?

systemProperty 'tests.lambda.processor.functionName', System.getProperty('tests.lambda.processor.functionName')
systemProperty 'tests.lambda.processor.sts_role_arn', System.getProperty('tests.lambda.processor.sts_role_arn')

filter {
includeTestsMatching '*IT'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda;
package org.opensearch.dataprepper.plugins.lambda.sink;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -35,8 +35,6 @@
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkConfig;
import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkService;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,18 @@ public interface Buffer {
SdkBytes getPayload();

void setEventCount(int eventCount);

//Metrics
public Duration getFlushLambdaSyncLatencyMetric();

public Long getPayloadRequestSyncSize();

public Duration getFlushLambdaAsyncLatencyMetric();

public Long getPayloadResponseSyncSize();

public Long getPayloadRequestAsyncSize();

public Long getPayloadResponseAsyncSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.services.lambda.model.LambdaException;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -31,7 +32,13 @@ public class InMemoryBuffer implements Buffer {
private final String invocationType;
private int eventCount;
private final StopWatch watch;
private final StopWatch lambdaSyncLatencyWatch;
private final StopWatch lambdaAsyncLatencyWatch;
private boolean isCodecStarted;
private long payloadRequestSyncSize;
private long payloadResponseSyncSize;
private long payloadRequestAsyncSize;
private long payloadResponseAsyncSize;


public InMemoryBuffer(LambdaClient lambdaClient, String functionName, String invocationType) {
Expand All @@ -44,6 +51,12 @@ public InMemoryBuffer(LambdaClient lambdaClient, String functionName, String inv
watch = new StopWatch();
watch.start();
isCodecStarted = false;
lambdaSyncLatencyWatch = new StopWatch();
lambdaAsyncLatencyWatch = new StopWatch();
payloadRequestSyncSize = 0;
payloadResponseSyncSize = 0;
payloadRequestAsyncSize = 0;
payloadResponseAsyncSize =0;
}

@Override
Expand All @@ -65,6 +78,7 @@ public Duration getDuration() {
public void flushToLambdaAsync() {
InvokeResponse resp;
SdkBytes payload = getPayload();
payloadRequestAsyncSize = payload.asByteArray().length;

// Setup an InvokeRequest.
InvokeRequest request = InvokeRequest.builder()
Expand All @@ -73,13 +87,17 @@ public void flushToLambdaAsync() {
.invocationType(invocationType)
.build();

lambdaClient.invoke(request);
lambdaAsyncLatencyWatch.start();
resp = lambdaClient.invoke(request);
lambdaAsyncLatencyWatch.stop();
payloadResponseAsyncSize = resp.payload().asByteArray().length;
}

@Override
public InvokeResponse flushToLambdaSync() {
InvokeResponse resp;
InvokeResponse resp = null;
SdkBytes payload = getPayload();
payloadRequestSyncSize = payload.asByteArray().length;

// Setup an InvokeRequest.
InvokeRequest request = InvokeRequest.builder()
Expand All @@ -88,8 +106,16 @@ public InvokeResponse flushToLambdaSync() {
.invocationType(invocationType)
.build();

resp = lambdaClient.invoke(request);
return resp;
lambdaSyncLatencyWatch.start();
try {
resp = lambdaClient.invoke(request);
payloadResponseSyncSize = resp.payload().asByteArray().length;
lambdaSyncLatencyWatch.stop();
return resp;
} catch (LambdaException e){
lambdaSyncLatencyWatch.stop();
throw new RuntimeException(e);
}
}

private SdkBytes validatePayload(String payload_string) {
Expand Down Expand Up @@ -121,6 +147,30 @@ public SdkBytes getPayload() {
byte[] bytes = byteArrayOutputStream.toByteArray();
SdkBytes sdkBytes = SdkBytes.fromByteArray(bytes);
return sdkBytes;
}
}

public Duration getFlushLambdaSyncLatencyMetric (){
return Duration.ofMillis(lambdaSyncLatencyWatch.getTime(TimeUnit.MILLISECONDS));
}

public Duration getFlushLambdaAsyncLatencyMetric (){
return Duration.ofMillis(lambdaAsyncLatencyWatch.getTime(TimeUnit.MILLISECONDS));
}

public Long getPayloadRequestSyncSize() {
return payloadRequestSyncSize;
}

public Long getPayloadResponseSyncSize() {
return payloadResponseSyncSize;
}

public Long getPayloadRequestAsyncSize() {
return payloadRequestAsyncSize;
}

public Long getPayloadResponseAsyncSize() {
return payloadResponseAsyncSize;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

public class ThresholdOptions {

private static final String DEFAULT_BYTE_CAPACITY = "6mb";
private static final String DEFAULT_BYTE_CAPACITY = "3mb";

@JsonProperty("event_count")
@Size(min = 0, max = 10000000, message = "event_count size should be between 0 and 10000000")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import java.util.Collection;

@DataPrepperPlugin(name = "lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class)
@DataPrepperPlugin(name = "aws_lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class)
public class LambdaSink extends AbstractSink<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(LambdaSink.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.opensearch.dataprepper.model.types.ByteCount;

class ThresholdOptionsTest {
private static final String DEFAULT_BYTE_CAPACITY = "6mb";
private static final String DEFAULT_BYTE_CAPACITY = "3mb";
private static final int DEFAULT_EVENT_COUNT = 0;

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class LambdaSinkTest {

public static final String S3_REGION = "us-east-1";
public static final String CODEC_PLUGIN_NAME = "json";
public static final String SINK_PLUGIN_NAME = "lambda";
public static final String SINK_PLUGIN_NAME = "aws_lambda";
public static final String SINK_PIPELINE_NAME = "lambda-sink-pipeline";
private LambdaSinkConfig lambdaSinkConfig;
private LambdaSink lambdaSink;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# To enable mocking of final classes with vanilla Mockito
# https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods
mock-maker-inline
36 changes: 0 additions & 36 deletions data-prepper-plugins/lambda/README.md

This file was deleted.

2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ include 'data-prepper-plugins:mongodb'
include 'data-prepper-plugins:rds-source'
include 'data-prepper-plugins:http-source-common'
include 'data-prepper-plugins:http-common'
include 'data-prepper-plugins:lambda'
include 'data-prepper-plugins:aws-lambda'
Loading