Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial poc of new sqs-source plugin #5197

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions data-prepper-plugins/sqs-source-new/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# SQS Source New

This source allows Data Prepper to use SQS as a source. It reads messages from specified SQS queues and processes them into events.

## Example Configuration

```yaml
sqs-test-pipeline:
source:
sqs-source-new:
queues:
- url: <SQS_QUEUE_URL_1>
batch_size: 10
workers: 2
- url: <SQS_QUEUE_URL_2>
batch_size: 5
workers: 3
aws:
region: <AWS_REGION>
sts_role_arn: <IAM_ROLE_ARN>
sink:
- stdout:
28 changes: 28 additions & 0 deletions data-prepper-plugins/sqs-source-new/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:common')
implementation libs.armeria.core
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:sqs'
implementation 'software.amazon.awssdk:arns'
implementation 'software.amazon.awssdk:sts'
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:blocking-buffer')
}
test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.sqssourcenew;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

class AwsAuthenticationAdapter {
private final AwsCredentialsSupplier awsCredentialsSupplier;
private final SqsSourceConfig sqsSourceConfig;


AwsAuthenticationAdapter(
final AwsCredentialsSupplier awsCredentialsSupplier,
final SqsSourceConfig sqsSourceConfig) {
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.sqsSourceConfig = sqsSourceConfig;
}

AwsCredentialsProvider getCredentialsProvider() {
final AwsAuthenticationOptions awsAuthenticationOptions = sqsSourceConfig.getAwsAuthenticationOptions();

final AwsCredentialsOptions options = AwsCredentialsOptions.builder()
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
.withRegion(awsAuthenticationOptions.getAwsRegion())
.withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
.withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId())
.build();

return awsCredentialsSupplier.getProvider(options);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.sqssourcenew;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.regions.Region;

import java.util.Map;
import java.util.Optional;

public class AwsAuthenticationOptions {
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_external_id")
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
private String awsStsExternalId;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

private void validateStsRoleArn() {
final Arn arn = getArn();
if (!AWS_IAM.equals(arn.service())) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
final Optional<String> resourceType = arn.resource().resourceType();
if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
}

private Arn getArn() {
try {
return Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
}
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public String getAwsStsExternalId() {
return awsStsExternalId;
}

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.opensearch.dataprepper.plugins.source.sqssourcenew;

import com.fasterxml.jackson.annotation.JsonProperty;

import jakarta.validation.Valid;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;

import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;

public class QueueConfig {

private static final int DEFAULT_MAXIMUM_MESSAGES = 10;
private static final Boolean DEFAULT_VISIBILITY_DUPLICATE_PROTECTION = false;
private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = Duration.ofSeconds(30);
private static final Duration DEFAULT_VISIBILITY_DUPLICATE_PROTECTION_TIMEOUT = Duration.ofHours(2);
private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20);
private static final Duration DEFAULT_POLL_DELAY_SECONDS = Duration.ofSeconds(0);
static final int DEFAULT_NUMBER_OF_WORKERS = 1;
private static final int DEFAULT_BATCH_SIZE = 10;

@JsonProperty("url")
@NotNull
private String url;

@JsonProperty("workers")
@Valid
private int numWorkers = DEFAULT_NUMBER_OF_WORKERS;

@JsonProperty("maximum_messages")
@Min(1)
@Max(10)
private int maximumMessages = DEFAULT_MAXIMUM_MESSAGES;

@JsonProperty("batch_size")
@Max(10)
private Integer batchSize = DEFAULT_BATCH_SIZE;

@JsonProperty("polling_frequency")
private Duration pollingFrequency = Duration.ZERO;

@JsonProperty("poll_delay")
@DurationMin(seconds = 0)
private Duration pollDelay = DEFAULT_POLL_DELAY_SECONDS;

@JsonProperty("visibility_timeout")
@DurationMin(seconds = 0)
@DurationMax(seconds = 43200)
private Duration visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT_SECONDS;

@JsonProperty("visibility_duplication_protection")
private Boolean visibilityDuplicateProtection = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION;

@JsonProperty("visibility_duplicate_protection_timeout")
@DurationMin(seconds = 30)
@DurationMax(hours = 24)
private Duration visibilityDuplicateProtectionTimeout = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION_TIMEOUT;

@JsonProperty("wait_time")
@DurationMin(seconds = 0)
@DurationMax(seconds = 20)
private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS;

public String getUrl() {
return url;
}

public Duration getPollingFrequency() {
return pollingFrequency;
}

public Integer getBatchSize() {
return batchSize;
}

public int getMaximumMessages() {
return maximumMessages;
}

public int getNumWorkers() {
return numWorkers;
}

public Duration getVisibilityTimeout() {
return visibilityTimeout;
}

public Boolean getVisibilityDuplicateProtection() {
return visibilityDuplicateProtection;
}

public Duration getVisibilityDuplicateProtectionTimeout() {
return visibilityDuplicateProtectionTimeout;
}

public Duration getWaitTime() {
return waitTime;
}

public Duration getPollDelay() {
return pollDelay;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.opensearch.dataprepper.plugins.source.sqssourcenew;

/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.model.Message;
import java.util.Map;
import java.util.Objects;

/**
* Implements the SqsMessageHandler to read and parse SQS messages generically and push to buffer.
*/
public class RawSqsMessageHandler implements SqsMessageHandler {

private static final Logger LOG = LoggerFactory.getLogger(RawSqsMessageHandler.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* Processes the SQS message, attempting to parse it as JSON, and adds it to the buffer.
*
* @param message - the SQS message for processing
* @param bufferAccumulator - the buffer accumulator
* @param acknowledgementSet - the acknowledgement set for end-to-end acknowledgements
*/
@Override
public void handleMessage(final Message message,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSet acknowledgementSet) {
try {
Event event;

String messageBody = message.body();

// Default is to try to parse the message body as JSON
try {
JsonNode jsonNode = objectMapper.readTree(messageBody);

event = JacksonEvent.builder()
.withEventType("sqs-event")
.withData(jsonNode)
.build();

} catch (Exception e) {
// Treat the message as plain text if json parsing doesn't work
LOG.debug("Message body is not valid JSON. Treating as plain text.");
event = JacksonEvent.builder()
.withEventType("sqs-event")
.withData(Map.of("message", messageBody))
.build();
}

if (Objects.nonNull(acknowledgementSet)) {
acknowledgementSet.add(event);
}

bufferAccumulator.add(new Record<>(event));

} catch (Exception e) {
LOG.error("Error processing SQS message: {}", e.getMessage(), e);
throw new RuntimeException(e);
}

try {
bufferAccumulator.flush();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.sqssourcenew;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import software.amazon.awssdk.services.sqs.model.Message;

import java.io.IOException;

public class SqsEventProcessor {
private final SqsMessageHandler sqsMessageHandler;
SqsEventProcessor(final SqsMessageHandler sqsMessageHandler) {
this.sqsMessageHandler= sqsMessageHandler;
}

void addSqsObject(final Message message,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSet acknowledgementSet) throws IOException {
sqsMessageHandler.handleMessage(message, bufferAccumulator, acknowledgementSet);
}

}
Loading
Loading