Skip to content

Commit

Permalink
Add support for lambda sink
Browse files Browse the repository at this point in the history
Signed-off-by: srigovs <[email protected]>
  • Loading branch information
srikanthjg committed May 9, 2024
1 parent f5b0fee commit f0fcd11
Show file tree
Hide file tree
Showing 42 changed files with 2,593 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
import org.opensearch.dataprepper.parser.PipelineTransformer;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
<<<<<<< HEAD
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader;
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationReader;
=======
>>>>>>> df4e99518 (Extract parsing in data-prepper-core to data-prepper-pipeline-parser module (#4247))
import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser;
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator;
import org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderReceiveBuffer;
import org.opensearch.dataprepper.pipeline.Pipeline;
<<<<<<< HEAD
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader;
=======
>>>>>>> df4e99518 (Extract parsing in data-prepper-core to data-prepper-pipeline-parser module (#4247))
import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.plugin.DefaultPluginFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,47 @@
@DataPrepperPlugin(name = "flatten", pluginType = Processor.class, pluginConfigurationType = FlattenProcessorConfig.class)
public class FlattenProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(FlattenProcessor.class);
<<<<<<< HEAD
private static final String SEPARATOR = "/";
private final FlattenProcessorConfig config;
private final ExpressionEvaluator expressionEvaluator;
private final Map<String, String> excludeKeysAndJsonPointers = new HashMap<>();
=======

private static final String SEPARATOR = "/";
private final FlattenProcessorConfig config;
private final ExpressionEvaluator expressionEvaluator;
>>>>>>> 185a1383f (Add flatten processor (#4138))

@DataPrepperPluginConstructor
public FlattenProcessor(final PluginMetrics pluginMetrics, final FlattenProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.config = config;
this.expressionEvaluator = expressionEvaluator;
<<<<<<< HEAD

for (final String key : config.getExcludeKeys()) {
excludeKeysAndJsonPointers.put(key, getJsonPointer(config.getSource(), key));
}
=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();
<<<<<<< HEAD
final Map<String, Object> excludeMap = new HashMap<>();
=======
>>>>>>> 185a1383f (Add flatten processor (#4138))

try {
if (config.getFlattenWhen() != null && !expressionEvaluator.evaluateConditional(config.getFlattenWhen(), recordEvent)) {
continue;
}

<<<<<<< HEAD
// remove fields specified in "exclude_keys" from the event temporarily
for (final String key : excludeKeysAndJsonPointers.keySet()) {
final String keyInEvent = excludeKeysAndJsonPointers.get(key);
Expand All @@ -62,6 +76,8 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}
}

=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
final String sourceJson = recordEvent.getAsJsonString(config.getSource());

// adds ignoreReservedCharacters() so that dots in keys are ignored during flattening
Expand All @@ -83,12 +99,15 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
} catch (Exception e) {
LOG.error("Fail to perform flatten operation", e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
<<<<<<< HEAD
} finally {
// Add temporarily deleted fields back
for (final String key : excludeMap.keySet()) {
final String keyInEvent = excludeKeysAndJsonPointers.get(key);
recordEvent.put(keyInEvent, excludeMap.get(key));
}
=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
}
}
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;

<<<<<<< HEAD
import java.util.ArrayList;
=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
import java.util.List;

public class FlattenProcessorConfig {

<<<<<<< HEAD
private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();

=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
@NotNull
@JsonProperty("source")
private String source;
Expand All @@ -29,9 +35,12 @@ public class FlattenProcessorConfig {
@JsonProperty("remove_list_indices")
private boolean removeListIndices = false;

<<<<<<< HEAD
@JsonProperty("exclude_keys")
private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;

=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
@JsonProperty("flatten_when")
private String flattenWhen;

Expand All @@ -54,10 +63,13 @@ public boolean isRemoveListIndices() {
return removeListIndices;
}

<<<<<<< HEAD
public List<String> getExcludeKeys() {
return excludeKeys;
}

=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
public String getFlattenWhen() {
return flattenWhen;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@

import org.junit.jupiter.api.Test;

<<<<<<< HEAD
import java.util.List;

=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

Expand All @@ -23,6 +26,9 @@ void testDefaultConfig() {
assertThat(FlattenProcessorConfig.isRemoveListIndices(), equalTo(false));
assertThat(FlattenProcessorConfig.getFlattenWhen(), equalTo(null));
assertThat(FlattenProcessorConfig.getTagsOnFailure(), equalTo(null));
<<<<<<< HEAD
assertThat(FlattenProcessorConfig.getExcludeKeys(), equalTo(List.of()));
=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
<<<<<<< HEAD
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
Expand All @@ -25,7 +28,10 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
<<<<<<< HEAD
import java.util.stream.Stream;
=======
>>>>>>> 185a1383f (Add flatten processor (#4138))

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -54,7 +60,10 @@ void setUp() {
lenient().when(mockConfig.isRemoveListIndices()).thenReturn(false);
lenient().when(mockConfig.getFlattenWhen()).thenReturn(null);
lenient().when(mockConfig.getTagsOnFailure()).thenReturn(new ArrayList<>());
<<<<<<< HEAD
lenient().when(mockConfig.getExcludeKeys()).thenReturn(new ArrayList<>());
=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
}

@Test
Expand Down Expand Up @@ -226,6 +235,7 @@ void testFailureTagsAreAddedWhenException() {
assertThat(resultEvent.getMetadata().getTags(), is(new HashSet<>(testTags)));
}

<<<<<<< HEAD
@ParameterizedTest
@MethodSource("excludeKeysTestArguments")
void testFlattenWithExcludeKeys(String source, String target, List<String> excludeKeys, Map<String, Object> expectedResultMap) {
Expand Down Expand Up @@ -330,6 +340,8 @@ private static Stream<Arguments> excludeKeysTestArguments() {
);
}

=======
>>>>>>> 185a1383f (Add flatten processor (#4138))
private FlattenProcessor createObjectUnderTest() {
return new FlattenProcessor(pluginMetrics, mockConfig, expressionEvaluator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class TrustStoreProvider {
private static final Logger LOG = LoggerFactory.getLogger(TrustStoreProvider.class);

public static TrustManager[] createTrustManager(final Path certificatePath) {
public TrustManager[] createTrustManager(final Path certificatePath) {
LOG.info("Using the certificate path {} to create trust manager.", certificatePath.toString());
try {
final KeyStore keyStore = createKeyStore(certificatePath);
Expand All @@ -37,7 +37,7 @@ public static TrustManager[] createTrustManager(final Path certificatePath) {
}
}

public static TrustManager[] createTrustManager(final String certificateContent) {
public TrustManager[] createTrustManager(final String certificateContent) {
LOG.info("Using the certificate content to create trust manager.");
try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) {
final KeyStore keyStore = createKeyStore(certificateInputStream);
Expand All @@ -49,26 +49,30 @@ public static TrustManager[] createTrustManager(final String certificateContent)
}
}

public static TrustManager[] createTrustAllManager() {
public TrustManager[] createTrustAllManager() {
LOG.info("Using the trust all manager to create trust manager.");
return new TrustManager[]{
new X509TrustAllManager()
};
}

private static KeyStore createKeyStore(final Path certificatePath) throws Exception {
private KeyStore createKeyStore(final Path certificatePath) throws Exception {
try (InputStream certificateInputStream = Files.newInputStream(certificatePath)) {
return createKeyStore(certificateInputStream);
}
}

<<<<<<< HEAD
private KeyStore createKeyStore(final InputStream certificateInputStream) throws Exception {
=======
private static KeyStore createKeyStore(final InputStream trustStoreInputStream, final String password) throws Exception {
final KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(trustStoreInputStream, password.toCharArray());
return trustStore;
}

private static KeyStore createKeyStore(final InputStream certificateInputStream) throws Exception {
>>>>>>> e7570ca1a (Add support for creating SSLContext for trustStore file path (#4264))
final CertificateFactory factory = CertificateFactory.getInstance("X.509");
final Certificate trustedCa = factory.generateCertificate(certificateInputStream);
final KeyStore trustStore = KeyStore.getInstance("pkcs12");
Expand All @@ -77,7 +81,7 @@ private static KeyStore createKeyStore(final InputStream certificateInputStream)
return trustStore;
}

public static SSLContext createSSLContext(final Path certificatePath) {
public SSLContext createSSLContext(final Path certificatePath) {
LOG.info("Using the certificate path to create SSL context.");
try (InputStream is = Files.newInputStream(certificatePath)) {
return createSSLContext(is);
Expand All @@ -86,6 +90,9 @@ public static SSLContext createSSLContext(final Path certificatePath) {
}
}

<<<<<<< HEAD
public SSLContext createSSLContext(final String certificateContent) {
=======
public static SSLContext createSSLContext(final Path trustStorePath, final String password) {
LOG.info("Using the truststore path and password to create SSL context.");
try (InputStream is = Files.newInputStream(trustStorePath)) {
Expand All @@ -96,6 +103,7 @@ public static SSLContext createSSLContext(final Path trustStorePath, final Strin
}

public static SSLContext createSSLContext(final String certificateContent) {
>>>>>>> e7570ca1a (Add support for creating SSLContext for trustStore file path (#4264))
LOG.info("Using the certificate content to create SSL context.");
try (InputStream certificateInputStream = new ByteArrayInputStream(certificateContent.getBytes())) {
return createSSLContext(certificateInputStream);
Expand All @@ -121,7 +129,7 @@ private static SSLContext createSSLContext(final InputStream certificateInputStr
return sslContext;
}

public static SSLContext createSSLContextWithTrustAllStrategy() {
public SSLContext createSSLContextWithTrustAllStrategy() {
LOG.info("Using the trust all strategy to create SSL context.");
try {
return SSLContexts.custom().loadTrustMaterial(null, new TrustStrategy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ public void configure(Map<String, ?> configs) {
}

private TrustManager[] getTrustManager() {
final TrustStoreProvider trustStoreProvider = new TrustStoreProvider();
final TrustManager[] trustManagers;
if (Objects.nonNull(certificateContent)) {
trustManagers = TrustStoreProvider.createTrustManager(certificateContent);
trustManagers = trustStoreProvider.createTrustManager(certificateContent);
} else {
trustManagers = TrustStoreProvider.createTrustAllManager();
trustManagers = trustStoreProvider.createTrustAllManager();
}
return trustManagers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,4 @@ private KafkaSourceConfig createKafkaSinkConfig(final String fileName) throws IO
final Reader reader = new StringReader(json);
return mapper.readValue(reader, KafkaSourceConfig.class);
}
}
}
36 changes: 36 additions & 0 deletions data-prepper-plugins/lambda-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Lambda Sink

This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.

## Usage
```aidl
lambda-pipeline:
...
sink:
- lambda:
aws:
region: "us-east-1"
sts_role_arn: "<arn>"
function_name: "uploadToS3Lambda"
max_retries: 3
batch:
batch_key: "osi_key"
threshold:
event_count: 3
maximum_size: 6mb
event_collect_timeout: 15s
dlq:
s3:
bucket: test-bucket
key_path_prefix: dlq/
```

## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.
The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:lambda-sink:integrationTest -Dtests.sink.lambda.region="us-east-1" -Dtests.sink.lambda.functionName="lambda_test_function" -Dtests.sink.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role
```
Loading

0 comments on commit f0fcd11

Please sign in to comment.