Skip to content

Commit

Permalink
Refactor lambda plugin (opensearch-project#4643)
Browse files Browse the repository at this point in the history
* Refactor lambda plugin

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Address comments

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Address comments 2

Signed-off-by: Srikanth Govindarajan <[email protected]>

---------

Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
srikanthjg authored and Krishna Kondaka committed Jul 23, 2024
1 parent 6f11497 commit 20c637b
Show file tree
Hide file tree
Showing 30 changed files with 163 additions and 146 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ dependencies {
implementation'org.json:json'
implementation libs.commons.lang3
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'org.projectlombok:lombok:1.18.22'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
testCompileOnly 'org.projectlombok:lombok:1.18.20'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-plugins:parse-json-processor')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.mockito.Mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
Expand All @@ -29,12 +30,14 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions;
import org.opensearch.dataprepper.plugins.sink.lambda.config.ThresholdOptions;
import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkConfig;
import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkService;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;

Expand All @@ -45,8 +48,6 @@
import java.util.HashMap;
import java.util.List;

import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class LambdaSinkServiceIT {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda.accumlator;
package org.opensearch.dataprepper.plugins.lambda.common.accumlator;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

import java.io.OutputStream;
import java.time.Duration;
Expand All @@ -21,7 +22,9 @@ public interface Buffer {

Duration getDuration();

void flushToLambda();
void flushToLambdaAsync();

InvokeResponse flushToLambdaSync();

OutputStream getOutputStream();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda.accumlator;
package org.opensearch.dataprepper.plugins.lambda.common.accumlator;

import software.amazon.awssdk.services.lambda.LambdaClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda.accumlator;
package org.opensearch.dataprepper.plugins.lambda.common.accumlator;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -62,7 +62,22 @@ public Duration getDuration() {


@Override
public void flushToLambda() {
public void flushToLambdaAsync() {
InvokeResponse resp;
SdkBytes payload = getPayload();

// Setup an InvokeRequest.
InvokeRequest request = InvokeRequest.builder()
.functionName(functionName)
.payload(payload)
.invocationType(invocationType)
.build();

lambdaClient.invoke(request);
}

@Override
public InvokeResponse flushToLambdaSync() {
InvokeResponse resp;
SdkBytes payload = getPayload();

Expand All @@ -74,6 +89,7 @@ public void flushToLambda() {
.build();

resp = lambdaClient.invoke(request);
return resp;
}

private SdkBytes validatePayload(String payload_string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda.accumlator;
package org.opensearch.dataprepper.plugins.lambda.common.accumlator;

import software.amazon.awssdk.services.lambda.LambdaClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.lambda.codec;
package org.opensearch.dataprepper.plugins.lambda.common.codec;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
Expand Down Expand Up @@ -37,7 +37,6 @@ public String getExtension() {
@Override
public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException {
Objects.requireNonNull(outputStream);
Objects.requireNonNull(codecContext);
this.codecContext = codecContext;
generator = factory.createGenerator(outputStream, JsonEncoding.UTF8);
if(Objects.nonNull(keyName)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda.config;
package org.opensearch.dataprepper.plugins.lambda.common.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda.config;
package org.opensearch.dataprepper.plugins.lambda.common.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
Expand All @@ -18,7 +18,7 @@ public class BatchOptions {

@JsonProperty("threshold")
@NotNull
ThresholdOptions thresholdOptions;
ThresholdOptions thresholdOptions = new ThresholdOptions();

public String getBatchKey(){return batchKey;}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda.config;
package org.opensearch.dataprepper.plugins.lambda.common.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;
import org.opensearch.dataprepper.model.types.ByteCount;

import java.time.Duration;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;


public class ThresholdOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda;
package org.opensearch.dataprepper.plugins.lambda.common.util;

import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;

import java.time.Duration;

Expand All @@ -15,9 +15,6 @@
*/
public class ThresholdCheck {

private ThresholdCheck() {
}

public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final Duration maxCollectionDuration, final Boolean isBatchEnabled) {
if (!isBatchEnabled) return true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda;
package org.opensearch.dataprepper.plugins.lambda.sink;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda;
package org.opensearch.dataprepper.plugins.lambda.sink;

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -17,9 +17,9 @@
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.lambda.LambdaClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.lambda;
package org.opensearch.dataprepper.plugins.lambda.sink;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;

import java.util.Objects;
import java.util.Map;
import java.util.Objects;

public class LambdaSinkConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,30 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda;
package org.opensearch.dataprepper.plugins.lambda.sink;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.lambda.codec.LambdaJsonCodec;
import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler;
import org.opensearch.dataprepper.plugins.sink.lambda.dlq.LambdaSinkFailedDlqData;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.codec.LambdaJsonCodec;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.LambdaSinkFailedDlqData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.lambda.LambdaClient;

Expand All @@ -48,7 +49,7 @@ public class LambdaSinkService {
private final PluginSetting pluginSetting;
private final Lock reentrantLock;
private final LambdaSinkConfig lambdaSinkConfig;
private LambdaClient lambdaClient;
private final LambdaClient lambdaClient;
private final String functionName;
private int maxEvents = 0;
private ByteCount maxBytes = null;
Expand All @@ -65,9 +66,9 @@ public class LambdaSinkService {
private final List<Event> events;
private OutputCodec codec = null;
private final BatchOptions batchOptions;
private Boolean isBatchEnabled;
private final Boolean isBatchEnabled;
private OutputCodecContext codecContext = null;
private String batchKey;
private final String batchKey;

public LambdaSinkService(final LambdaClient lambdaClient,
final LambdaSinkConfig lambdaSinkConfig,
Expand Down Expand Up @@ -213,7 +214,7 @@ protected boolean retryFlushToLambda(Buffer currentBuffer,
do {

try {
currentBuffer.flushToLambda();
currentBuffer.flushToLambdaAsync();
isUploadedToLambda = Boolean.TRUE;
} catch (AwsServiceException | SdkClientException e) {
errorMsgObj.set(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.lambda.dlq;
package org.opensearch.dataprepper.plugins.lambda.sink.dlq;

import com.fasterxml.jackson.databind.ObjectWriter;
import io.micrometer.core.instrument.util.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.lambda.dlq;
package org.opensearch.dataprepper.plugins.lambda.sink.dlq;

import com.fasterxml.jackson.core.JsonProcessingException;
import software.amazon.awssdk.core.SdkBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda;
package org.opensearch.dataprepper.plugins.lambda.common;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import static org.mockito.Mockito.when;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck;

import java.io.IOException;
import java.time.Duration;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ThresholdCheckTest {

Expand Down
Loading

0 comments on commit 20c637b

Please sign in to comment.