Skip to content

Commit

Permalink
initial poc of new sqs-source plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremy Michael committed Nov 17, 2024
1 parent 85f1718 commit 15e19ff
Show file tree
Hide file tree
Showing 14 changed files with 965 additions and 1 deletion.
22 changes: 22 additions & 0 deletions data-prepper-plugins/sqs-source-new/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# SQS Source New

This source allows Data Prepper to use SQS as a source. It reads messages from specified SQS queues and processes them into events.

## Example Configuration

```yaml
sqs-test-pipeline:
source:
sqs-source-new:
queues:
- url: <SQS_QUEUE_URL_1>
batch_size: 10
workers: 2
- url: <SQS_QUEUE_URL_2>
batch_size: 5
workers: 3
aws:
region: <AWS_REGION>
sts_role_arn: <IAM_ROLE_ARN>
sink:
- stdout:
28 changes: 28 additions & 0 deletions data-prepper-plugins/sqs-source-new/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:common')
implementation libs.armeria.core
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:sqs'
implementation 'software.amazon.awssdk:arns'
implementation 'software.amazon.awssdk:sts'
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:blocking-buffer')
}
test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.sqssourcenew;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

class AwsAuthenticationAdapter {
private final AwsCredentialsSupplier awsCredentialsSupplier;
private final SqsSourceConfig sqsSourceConfig;


AwsAuthenticationAdapter(
final AwsCredentialsSupplier awsCredentialsSupplier,
final SqsSourceConfig sqsSourceConfig) {
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.sqsSourceConfig = sqsSourceConfig;
}

AwsCredentialsProvider getCredentialsProvider() {
final AwsAuthenticationOptions awsAuthenticationOptions = sqsSourceConfig.getAwsAuthenticationOptions();

final AwsCredentialsOptions options = AwsCredentialsOptions.builder()
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
.withRegion(awsAuthenticationOptions.getAwsRegion())
.withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
.withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId())
.build();

return awsCredentialsSupplier.getProvider(options);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.sqssourcenew;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;

import java.util.Map;
import java.util.Optional;

public class AwsAuthenticationOptions {
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_external_id")
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
private String awsStsExternalId;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

private void validateStsRoleArn() {
final Arn arn = getArn();
if (!AWS_IAM.equals(arn.service())) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
final Optional<String> resourceType = arn.resource().resourceType();
if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
}

private Arn getArn() {
try {
return Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
}
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public String getAwsStsExternalId() {
return awsStsExternalId;
}

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.opensearch.dataprepper.plugins.source.sqssourcenew;

import com.fasterxml.jackson.annotation.JsonProperty;

import jakarta.validation.Valid;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;

import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;

public class QueueConfig {

private static final int DEFAULT_MAXIMUM_MESSAGES = 10;
private static final Boolean DEFAULT_VISIBILITY_DUPLICATE_PROTECTION = false;
private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = Duration.ofSeconds(30);
private static final Duration DEFAULT_VISIBILITY_DUPLICATE_PROTECTION_TIMEOUT = Duration.ofHours(2);
private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20);
private static final Duration DEFAULT_POLL_DELAY_SECONDS = Duration.ofSeconds(0);
static final int DEFAULT_NUMBER_OF_WORKERS = 1;
private static final int DEFAULT_BATCH_SIZE = 10;

@JsonProperty("url")
@NotNull
private String url;

@JsonProperty("workers")
@Valid
private int numWorkers = DEFAULT_NUMBER_OF_WORKERS;

@JsonProperty("maximum_messages")
@Min(1)
@Max(10)
private int maximumMessages = DEFAULT_MAXIMUM_MESSAGES;

@JsonProperty("batch_size")
@Max(10)
private Integer batchSize = DEFAULT_BATCH_SIZE;

@JsonProperty("polling_frequency")
private Duration pollingFrequency = Duration.ZERO;

@JsonProperty("poll_delay")
@DurationMin(seconds = 0)
private Duration pollDelay = DEFAULT_POLL_DELAY_SECONDS;

@JsonProperty("visibility_timeout")
@DurationMin(seconds = 0)
@DurationMax(seconds = 43200)
private Duration visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT_SECONDS;

@JsonProperty("visibility_duplication_protection")
private Boolean visibilityDuplicateProtection = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION;

@JsonProperty("visibility_duplicate_protection_timeout")
@DurationMin(seconds = 30)
@DurationMax(hours = 24)
private Duration visibilityDuplicateProtectionTimeout = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION_TIMEOUT;

@JsonProperty("wait_time")
@DurationMin(seconds = 0)
@DurationMax(seconds = 20)
private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS;

public String getUrl() {
return url;
}

public Duration getPollingFrequency() {
return pollingFrequency;
}

public Integer getBatchSize() {
return batchSize;
}

public int getMaximumMessages() {
return maximumMessages;
}

public int getNumWorkers() {
return numWorkers;
}

public Duration getVisibilityTimeout() {
return visibilityTimeout;
}

public Boolean getVisibilityDuplicateProtection() {
return visibilityDuplicateProtection;
}

public Duration getVisibilityDuplicateProtectionTimeout() {
return visibilityDuplicateProtectionTimeout;
}

public Duration getWaitTime() {
return waitTime;
}

public Duration getPollDelay() {
return pollDelay;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.opensearch.dataprepper.plugins.source.sqssourcenew;

/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.model.Message;
import java.util.Map;
import java.util.Objects;

/**
* Implements the SqsMessageHandler to read and parse SQS messages generically and push to buffer.
*/
public class RawSqsMessageHandler implements SqsMessageHandler {

private static final Logger LOG = LoggerFactory.getLogger(RawSqsMessageHandler.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* Processes the SQS message, attempting to parse it as JSON, and adds it to the buffer.
*
* @param message - the SQS message for processing
* @param bufferAccumulator - the buffer accumulator
* @param acknowledgementSet - the acknowledgement set for end-to-end acknowledgements
*/
@Override
public void handleMessage(final Message message,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSet acknowledgementSet) {
try {
Event event;

String messageBody = message.body();

// Default is to try to parse the message body as JSON
try {
JsonNode jsonNode = objectMapper.readTree(messageBody);

event = JacksonEvent.builder()
.withEventType("sqs-event")
.withData(jsonNode)
.build();

} catch (Exception e) {
// Treat the message as plain text if json parsing doesn't work
LOG.debug("Message body is not valid JSON. Treating as plain text.");
event = JacksonEvent.builder()
.withEventType("sqs-event")
.withData(Map.of("message", messageBody))
.build();
}

if (Objects.nonNull(acknowledgementSet)) {
acknowledgementSet.add(event);
}

bufferAccumulator.add(new Record<>(event));

} catch (Exception e) {
LOG.error("Error processing SQS message: {}", e.getMessage(), e);
throw new RuntimeException(e);
}

try {
bufferAccumulator.flush();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.sqssourcenew;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import software.amazon.awssdk.services.sqs.model.Message;

import java.io.IOException;

public class SqsEventProcessor {
private final SqsMessageHandler sqsMessageHandler;
SqsEventProcessor(final SqsMessageHandler sqsMessageHandler) {
this.sqsMessageHandler= sqsMessageHandler;
}

void addSqsObject(final Message message,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSet acknowledgementSet) throws IOException {
sqsMessageHandler.handleMessage(message, bufferAccumulator, acknowledgementSet);
}

}
Loading

0 comments on commit 15e19ff

Please sign in to comment.