From 15e19ff0240aede2aa7fe7eacd7cb3203b4edc6a Mon Sep 17 00:00:00 2001 From: Jeremy Michael Date: Sat, 16 Nov 2024 16:36:00 -0800 Subject: [PATCH] initial poc of new sqs-source plugin --- data-prepper-plugins/sqs-source-new/README.md | 22 ++ .../sqs-source-new/build.gradle | 28 ++ .../AwsAuthenticationAdapter.java | 36 ++ .../AwsAuthenticationOptions.java | 70 ++++ .../source/sqssourcenew/QueueConfig.java | 107 ++++++ .../sqssourcenew/RawSqsMessageHandler.java | 80 +++++ .../sqssourcenew/SqsEventProcessor.java | 29 ++ .../sqssourcenew/SqsMessageHandler.java | 19 ++ .../SqsRetriesExhaustedException.java | 13 + .../source/sqssourcenew/SqsService.java | 128 +++++++ .../source/sqssourcenew/SqsSource.java | 68 ++++ .../source/sqssourcenew/SqsSourceConfig.java | 52 +++ .../source/sqssourcenew/SqsWorker.java | 311 ++++++++++++++++++ settings.gradle | 3 +- 14 files changed, 965 insertions(+), 1 deletion(-) create mode 100644 data-prepper-plugins/sqs-source-new/README.md create mode 100644 data-prepper-plugins/sqs-source-new/build.gradle create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/AwsAuthenticationAdapter.java create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/AwsAuthenticationOptions.java create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/QueueConfig.java create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/RawSqsMessageHandler.java create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsEventProcessor.java create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsMessageHandler.java create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsRetriesExhaustedException.java create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsService.java create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsSource.java create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsSourceConfig.java create mode 100644 data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsWorker.java diff --git a/data-prepper-plugins/sqs-source-new/README.md b/data-prepper-plugins/sqs-source-new/README.md new file mode 100644 index 0000000000..83243cdfee --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/README.md @@ -0,0 +1,22 @@ +# SQS Source New + +This source allows Data Prepper to use SQS as a source. It reads messages from specified SQS queues and processes them into events. + +## Example Configuration + +```yaml +sqs-test-pipeline: + source: + sqs-source-new: + queues: + - url: + batch_size: 10 + workers: 2 + - url: + batch_size: 5 + workers: 3 + aws: + region: + sts_role_arn: + sink: + - stdout: diff --git a/data-prepper-plugins/sqs-source-new/build.gradle b/data-prepper-plugins/sqs-source-new/build.gradle new file mode 100644 index 0000000000..b4ffbc8e5e --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/build.gradle @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:buffer-common') + implementation project(':data-prepper-plugins:common') + implementation libs.armeria.core + implementation project(':data-prepper-plugins:aws-plugin-api') + implementation 'software.amazon.awssdk:sqs' + implementation 'software.amazon.awssdk:arns' + implementation 'software.amazon.awssdk:sts' + implementation 'io.micrometer:micrometer-core' + implementation 'com.fasterxml.jackson.core:jackson-annotations' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final' + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + testImplementation project(':data-prepper-plugins:blocking-buffer') +} +test { + useJUnitPlatform() +} diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/AwsAuthenticationAdapter.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/AwsAuthenticationAdapter.java new file mode 100644 index 0000000000..c8a5265ed7 --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/AwsAuthenticationAdapter.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.sqssourcenew; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; + +class AwsAuthenticationAdapter { + private final AwsCredentialsSupplier awsCredentialsSupplier; + private final SqsSourceConfig sqsSourceConfig; + + + AwsAuthenticationAdapter( + final AwsCredentialsSupplier awsCredentialsSupplier, + final SqsSourceConfig sqsSourceConfig) { + this.awsCredentialsSupplier = awsCredentialsSupplier; + this.sqsSourceConfig = sqsSourceConfig; + } + + AwsCredentialsProvider getCredentialsProvider() { + final AwsAuthenticationOptions awsAuthenticationOptions = sqsSourceConfig.getAwsAuthenticationOptions(); + + final AwsCredentialsOptions options = AwsCredentialsOptions.builder() + .withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn()) + .withRegion(awsAuthenticationOptions.getAwsRegion()) + .withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides()) + .withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId()) + .build(); + + return awsCredentialsSupplier.getProvider(options); + } +} diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/AwsAuthenticationOptions.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/AwsAuthenticationOptions.java new file mode 100644 index 0000000000..c3edba5acd --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/AwsAuthenticationOptions.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + package org.opensearch.dataprepper.plugins.source.sqssourcenew; + + import com.fasterxml.jackson.annotation.JsonProperty; + import jakarta.validation.constraints.Size; + import software.amazon.awssdk.arns.Arn; + import software.amazon.awssdk.regions.Region; + + import java.util.Map; + import java.util.Optional; + + public class AwsAuthenticationOptions { + private static final String AWS_IAM_ROLE = "role"; + private static final String AWS_IAM = "iam"; + + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_external_id") + @Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters") + private String awsStsExternalId; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + private void validateStsRoleArn() { + final Arn arn = getArn(); + if (!AWS_IAM.equals(arn.service())) { + throw new IllegalArgumentException("sts_role_arn must be an IAM Role"); + } + final Optional resourceType = arn.resource().resourceType(); + if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) { + throw new IllegalArgumentException("sts_role_arn must be an IAM Role"); + } + } + + private Arn getArn() { + try { + return Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn)); + } + } + + public String getAwsStsRoleArn() { + return awsStsRoleArn; + } + + public String getAwsStsExternalId() { + return awsStsExternalId; + } + + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + public Map getAwsStsHeaderOverrides() { + return awsStsHeaderOverrides; + } + } \ No newline at end of file diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/QueueConfig.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/QueueConfig.java new file mode 100644 index 0000000000..938a4f74bd --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/QueueConfig.java @@ -0,0 +1,107 @@ +package org.opensearch.dataprepper.plugins.source.sqssourcenew; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import java.time.Duration; + +import org.hibernate.validator.constraints.time.DurationMax; +import org.hibernate.validator.constraints.time.DurationMin; + +public class QueueConfig { + + private static final int DEFAULT_MAXIMUM_MESSAGES = 10; + private static final Boolean DEFAULT_VISIBILITY_DUPLICATE_PROTECTION = false; + private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = Duration.ofSeconds(30); + private static final Duration DEFAULT_VISIBILITY_DUPLICATE_PROTECTION_TIMEOUT = Duration.ofHours(2); + private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20); + private static final Duration DEFAULT_POLL_DELAY_SECONDS = Duration.ofSeconds(0); + static final int DEFAULT_NUMBER_OF_WORKERS = 1; + private static final int DEFAULT_BATCH_SIZE = 10; + + @JsonProperty("url") + @NotNull + private String url; + + @JsonProperty("workers") + @Valid + private int numWorkers = DEFAULT_NUMBER_OF_WORKERS; + + @JsonProperty("maximum_messages") + @Min(1) + @Max(10) + private int maximumMessages = DEFAULT_MAXIMUM_MESSAGES; + + @JsonProperty("batch_size") + @Max(10) + private Integer batchSize = DEFAULT_BATCH_SIZE; + + @JsonProperty("polling_frequency") + private Duration pollingFrequency = Duration.ZERO; + + @JsonProperty("poll_delay") + @DurationMin(seconds = 0) + private Duration pollDelay = DEFAULT_POLL_DELAY_SECONDS; + + @JsonProperty("visibility_timeout") + @DurationMin(seconds = 0) + @DurationMax(seconds = 43200) + private Duration visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT_SECONDS; + + @JsonProperty("visibility_duplication_protection") + private Boolean visibilityDuplicateProtection = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION; + + @JsonProperty("visibility_duplicate_protection_timeout") + @DurationMin(seconds = 30) + @DurationMax(hours = 24) + private Duration visibilityDuplicateProtectionTimeout = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION_TIMEOUT; + + @JsonProperty("wait_time") + @DurationMin(seconds = 0) + @DurationMax(seconds = 20) + private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS; + + public String getUrl() { + return url; + } + + public Duration getPollingFrequency() { + return pollingFrequency; + } + + public Integer getBatchSize() { + return batchSize; + } + + public int getMaximumMessages() { + return maximumMessages; + } + + public int getNumWorkers() { + return numWorkers; + } + + public Duration getVisibilityTimeout() { + return visibilityTimeout; + } + + public Boolean getVisibilityDuplicateProtection() { + return visibilityDuplicateProtection; + } + + public Duration getVisibilityDuplicateProtectionTimeout() { + return visibilityDuplicateProtectionTimeout; + } + + public Duration getWaitTime() { + return waitTime; + } + + public Duration getPollDelay() { + return pollDelay; + } +} + diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/RawSqsMessageHandler.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/RawSqsMessageHandler.java new file mode 100644 index 0000000000..43175d1ad9 --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/RawSqsMessageHandler.java @@ -0,0 +1,80 @@ +package org.opensearch.dataprepper.plugins.source.sqssourcenew; + +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.model.Message; +import java.util.Map; +import java.util.Objects; + +/** + * Implements the SqsMessageHandler to read and parse SQS messages generically and push to buffer. + */ +public class RawSqsMessageHandler implements SqsMessageHandler { + + private static final Logger LOG = LoggerFactory.getLogger(RawSqsMessageHandler.class); + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Processes the SQS message, attempting to parse it as JSON, and adds it to the buffer. + * + * @param message - the SQS message for processing + * @param bufferAccumulator - the buffer accumulator + * @param acknowledgementSet - the acknowledgement set for end-to-end acknowledgements + */ + @Override + public void handleMessage(final Message message, + final BufferAccumulator> bufferAccumulator, + final AcknowledgementSet acknowledgementSet) { + try { + Event event; + + String messageBody = message.body(); + + // Default is to try to parse the message body as JSON + try { + JsonNode jsonNode = objectMapper.readTree(messageBody); + + event = JacksonEvent.builder() + .withEventType("sqs-event") + .withData(jsonNode) + .build(); + + } catch (Exception e) { + // Treat the message as plain text if json parsing doesn't work + LOG.debug("Message body is not valid JSON. Treating as plain text."); + event = JacksonEvent.builder() + .withEventType("sqs-event") + .withData(Map.of("message", messageBody)) + .build(); + } + + if (Objects.nonNull(acknowledgementSet)) { + acknowledgementSet.add(event); + } + + bufferAccumulator.add(new Record<>(event)); + + } catch (Exception e) { + LOG.error("Error processing SQS message: {}", e.getMessage(), e); + throw new RuntimeException(e); + } + + try { + bufferAccumulator.flush(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsEventProcessor.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsEventProcessor.java new file mode 100644 index 0000000000..e3e9722baa --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsEventProcessor.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.sqssourcenew; + +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import software.amazon.awssdk.services.sqs.model.Message; + +import java.io.IOException; + +public class SqsEventProcessor { + private final SqsMessageHandler sqsMessageHandler; + SqsEventProcessor(final SqsMessageHandler sqsMessageHandler) { + this.sqsMessageHandler= sqsMessageHandler; + } + + void addSqsObject(final Message message, + final BufferAccumulator> bufferAccumulator, + final AcknowledgementSet acknowledgementSet) throws IOException { + sqsMessageHandler.handleMessage(message, bufferAccumulator, acknowledgementSet); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsMessageHandler.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsMessageHandler.java new file mode 100644 index 0000000000..61911118d7 --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsMessageHandler.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.sqssourcenew; + +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.io.IOException; + +public interface SqsMessageHandler { + void handleMessage(final Message message, + final BufferAccumulator> bufferAccumulator, + final AcknowledgementSet acknowledgementSet) throws IOException ; +} diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsRetriesExhaustedException.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsRetriesExhaustedException.java new file mode 100644 index 0000000000..08cd673bed --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsRetriesExhaustedException.java @@ -0,0 +1,13 @@ +package org.opensearch.dataprepper.plugins.source.sqssourcenew; + +/** + * This exception is thrown when SQS retries are exhausted + * + * @since 2.1 + */ +public class SqsRetriesExhaustedException extends RuntimeException { + + public SqsRetriesExhaustedException(final String errorMessage) { + super(errorMessage); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsService.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsService.java new file mode 100644 index 0000000000..20bc15e9ce --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsService.java @@ -0,0 +1,128 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + package org.opensearch.dataprepper.plugins.source.sqssourcenew; + + import com.linecorp.armeria.client.retry.Backoff; + import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; + import org.opensearch.dataprepper.metrics.PluginMetrics; + import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; + import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; + import software.amazon.awssdk.core.retry.RetryPolicy; + import software.amazon.awssdk.services.sqs.SqsClient; + import org.opensearch.dataprepper.buffer.common.BufferAccumulator; + import org.opensearch.dataprepper.model.buffer.Buffer; + import org.opensearch.dataprepper.model.event.Event; + import org.opensearch.dataprepper.model.record.Record; + import java.time.Duration; + import java.util.ArrayList; + import java.util.List; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.Executors; + import java.util.concurrent.ExecutorService; + import java.util.stream.Collectors; + import java.util.stream.IntStream; + + public class SqsService { + private static final Logger LOG = LoggerFactory.getLogger(SqsService.class); + static final long SHUTDOWN_TIMEOUT = 30L; + static final long INITIAL_DELAY = Duration.ofSeconds(20).toMillis(); + static final long MAXIMUM_DELAY = Duration.ofMinutes(5).toMillis(); + static final double JITTER_RATE = 0.20; + + private final SqsSourceConfig sqsSourceConfig; + private final SqsEventProcessor sqsEventProcessor; + private final SqsClient sqsClient; + private final PluginMetrics pluginMetrics; + private final AcknowledgementSetManager acknowledgementSetManager; + private final List allSqsUrlExecutorServices; + private final List sqsWorkers; + private final BufferAccumulator> bufferAccumulator; + + public SqsService(final Buffer> buffer, + final AcknowledgementSetManager acknowledgementSetManager, + final SqsSourceConfig sqsSourceConfig, + final SqsEventProcessor sqsEventProcessor, + final PluginMetrics pluginMetrics, + final AwsCredentialsProvider credentialsProvider) { + + this.sqsSourceConfig = sqsSourceConfig; + this.sqsEventProcessor = sqsEventProcessor; + this.pluginMetrics = pluginMetrics; + this.acknowledgementSetManager = acknowledgementSetManager; + this.allSqsUrlExecutorServices = new ArrayList<>(); + this.sqsWorkers = new ArrayList<>(); + this.sqsClient = createSqsClient(credentialsProvider); + this.bufferAccumulator = BufferAccumulator.create(buffer, sqsSourceConfig.getNumberOfRecordsToAccumulate(), sqsSourceConfig.getBufferTimeout()); + + } + + + public void start() { + final Backoff backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE) + .withMaxAttempts(Integer.MAX_VALUE); + + LOG.info("Starting SqsService"); + + sqsSourceConfig.getQueues().forEach(queueConfig -> { + String queueUrl = queueConfig.getUrl(); + int numWorkers = queueConfig.getNumWorkers(); + ExecutorService executorService = Executors.newFixedThreadPool( + numWorkers, BackgroundThreadFactory.defaultExecutorThreadFactory("sqs-source-new-" + queueUrl)); + allSqsUrlExecutorServices.add(executorService); + List workers = IntStream.range(0, numWorkers) + .mapToObj(i -> new SqsWorker( + bufferAccumulator, + acknowledgementSetManager, + sqsClient, + sqsEventProcessor, + sqsSourceConfig, + queueConfig, + pluginMetrics, + backoff)) + .collect(Collectors.toList()); + + sqsWorkers.addAll(workers); + workers.forEach(executorService::submit); + LOG.info("Started SQS workers for queue {} with {} workers", queueUrl, numWorkers); + }); + } + + SqsClient createSqsClient(final AwsCredentialsProvider credentialsProvider) { + LOG.debug("Creating SQS client"); + return SqsClient.builder() + .region(sqsSourceConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(credentialsProvider) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy(RetryPolicy.builder().numRetries(5).build()) + .build()) + .build(); + } + + public void stop() { + allSqsUrlExecutorServices.forEach(ExecutorService::shutdown); + sqsWorkers.forEach(SqsWorker::stop); + allSqsUrlExecutorServices.forEach(executorService -> { + try { + if (!executorService.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) { + LOG.warn("Failed to terminate SqsWorkers"); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.error("Interrupted during shutdown, exiting uncleanly...", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + }); + + sqsClient.close(); + LOG.info("SqsService shutdown completed."); + } + + } + \ No newline at end of file diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsSource.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsSource.java new file mode 100644 index 0000000000..966f28f230 --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsSource.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.sqssourcenew; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import java.util.Objects; + +@DataPrepperPlugin(name = "sqs-source-new", pluginType = Source.class,pluginConfigurationType = SqsSourceConfig.class) +public class SqsSource implements Source> { + + private final PluginMetrics pluginMetrics; + private final SqsSourceConfig sqsSourceConfig; + private SqsService sqsService; + private final AcknowledgementSetManager acknowledgementSetManager; + private final AwsCredentialsSupplier awsCredentialsSupplier; + private final boolean acknowledgementsEnabled; + + + @DataPrepperPluginConstructor + public SqsSource(final PluginMetrics pluginMetrics, + final SqsSourceConfig sqsSourceConfig, + final AcknowledgementSetManager acknowledgementSetManager, + final AwsCredentialsSupplier awsCredentialsSupplier) { + + this.pluginMetrics = pluginMetrics; + this.sqsSourceConfig = sqsSourceConfig; + this.acknowledgementsEnabled = sqsSourceConfig.getAcknowledgements(); + this.acknowledgementSetManager = acknowledgementSetManager; + this.awsCredentialsSupplier = awsCredentialsSupplier; + + } + + @Override + public void start(Buffer> buffer) { + if (buffer == null) { + throw new IllegalStateException("Buffer is null"); + } + final AwsAuthenticationAdapter awsAuthenticationAdapter = new AwsAuthenticationAdapter(awsCredentialsSupplier, sqsSourceConfig); + final AwsCredentialsProvider credentialsProvider = awsAuthenticationAdapter.getCredentialsProvider(); + final SqsMessageHandler rawSqsMessageHandler = new RawSqsMessageHandler(); + final SqsEventProcessor sqsEventProcessor = new SqsEventProcessor(rawSqsMessageHandler); + sqsService = new SqsService(buffer, acknowledgementSetManager, sqsSourceConfig, sqsEventProcessor, pluginMetrics, credentialsProvider); + sqsService.start(); + } + + @Override + public boolean areAcknowledgementsEnabled() { + return acknowledgementsEnabled; + } + + @Override + public void stop() { + if (Objects.nonNull(sqsService)) { + sqsService.stop(); + } + } +} diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsSourceConfig.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsSourceConfig.java new file mode 100644 index 0000000000..3377922e42 --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsSourceConfig.java @@ -0,0 +1,52 @@ +package org.opensearch.dataprepper.plugins.source.sqssourcenew; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import java.time.Duration; +import java.util.List; + +public class SqsSourceConfig { + + static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(10); + static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 100; + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("acknowledgments") + private boolean acknowledgments = false; + + @JsonProperty("buffer_timeout") + private Duration bufferTimeout = DEFAULT_BUFFER_TIMEOUT; + + @JsonProperty("records_to_accumulate") + private int numberOfRecordsToAccumulate = DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE; + + @JsonProperty("queues") + @NotNull + @Valid + private List queues; + + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + public int getNumberOfRecordsToAccumulate() { + return numberOfRecordsToAccumulate; + } + + public boolean getAcknowledgements() { + return acknowledgments; + } + + public Duration getBufferTimeout() { + return bufferTimeout; + } + + public List getQueues() { + return queues; + } +} diff --git a/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsWorker.java b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsWorker.java new file mode 100644 index 0000000000..ae2d1d1e8b --- /dev/null +++ b/data-prepper-plugins/sqs-source-new/src/main/java/org/opensearch/dataprepper/plugins/source/sqssourcenew/SqsWorker.java @@ -0,0 +1,311 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.sqssourcenew; + +import com.linecorp.armeria.client.retry.Backoff; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.SqsException; +import software.amazon.awssdk.services.sts.model.StsException; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + + +public class SqsWorker implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(SqsWorker.class); + static final String SQS_MESSAGES_RECEIVED_METRIC_NAME = "sqsMessagesReceived"; + static final String SQS_MESSAGES_DELETED_METRIC_NAME = "sqsMessagesDeleted"; + static final String SQS_MESSAGES_FAILED_METRIC_NAME = "sqsMessagesFailed"; + static final String SQS_MESSAGES_DELETE_FAILED_METRIC_NAME = "sqsMessagesDeleteFailed"; + static final String SQS_MESSAGE_DELAY_METRIC_NAME = "sqsMessageDelay"; + static final String SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangedCount"; + static final String SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangeFailedCount"; + static final String ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; + + private final SqsClient sqsClient; + private final SqsEventProcessor sqsEventProcessor; + private final Counter sqsMessagesReceivedCounter; + private final Counter sqsMessagesDeletedCounter; + private final Counter sqsMessagesFailedCounter; + private final Counter sqsMessagesDeleteFailedCounter; + private final Counter acknowledgementSetCallbackCounter; + private final Counter sqsVisibilityTimeoutChangedCount; + private final Counter sqsVisibilityTimeoutChangeFailedCount; + private final Timer sqsMessageDelayTimer; + private final Backoff standardBackoff; + private final QueueConfig queueConfig; + private int failedAttemptCount; + private final boolean endToEndAcknowledgementsEnabled; + private final AcknowledgementSetManager acknowledgementSetManager; + private volatile boolean isStopped = false; + private final BufferAccumulator> bufferAccumulator; + private Map messageVisibilityTimesMap; + + public SqsWorker(final BufferAccumulator> bufferAccumulator, + final AcknowledgementSetManager acknowledgementSetManager, + final SqsClient sqsClient, + final SqsEventProcessor sqsEventProcessor, + final SqsSourceConfig sqsSourceConfig, + final QueueConfig queueConfig, + final PluginMetrics pluginMetrics, + final Backoff backoff) { + + this.bufferAccumulator = bufferAccumulator; + this.sqsClient = sqsClient; + this.sqsEventProcessor = sqsEventProcessor; + this.queueConfig = queueConfig; + this.acknowledgementSetManager = acknowledgementSetManager; + this.standardBackoff = backoff; + this.endToEndAcknowledgementsEnabled = sqsSourceConfig.getAcknowledgements(); + messageVisibilityTimesMap = new HashMap<>(); + + failedAttemptCount = 0; + + sqsMessagesReceivedCounter = pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME); + sqsMessagesDeletedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETED_METRIC_NAME); + sqsMessagesFailedCounter = pluginMetrics.counter(SQS_MESSAGES_FAILED_METRIC_NAME); + sqsMessagesDeleteFailedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETE_FAILED_METRIC_NAME); + sqsMessageDelayTimer = pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME); + acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME); + sqsVisibilityTimeoutChangedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME); + sqsVisibilityTimeoutChangeFailedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME); + } + + @Override + public void run() { + while (!isStopped) { + int messagesProcessed = 0; + try { + messagesProcessed = processSqsMessages(); + } catch (final Exception e) { + LOG.error("Unable to process SQS messages. Processing error due to: {}", e.getMessage()); + // There shouldn't be any exceptions caught here, but added backoff just to control the amount of logging in case of an exception is thrown. + applyBackoff(); + } + + if (messagesProcessed > 0 && queueConfig.getPollDelay().toMillis() > 0) { + try { + Thread.sleep(queueConfig.getPollDelay().toMillis()); + } catch (final InterruptedException e) { + LOG.error("Thread is interrupted while polling SQS.", e); + } + } + } + } + + int processSqsMessages() { + final List messages = getMessagesFromSqs(); + if (!messages.isEmpty()) { + sqsMessagesReceivedCounter.increment(messages.size()); + final List deleteMessageBatchRequestEntries = processSqsEvents(messages); + if (!deleteMessageBatchRequestEntries.isEmpty()) { + deleteSqsMessages(deleteMessageBatchRequestEntries); + } + } + return messages.size(); + } + + private List getMessagesFromSqs() { + try { + final ReceiveMessageRequest request = createReceiveMessageRequest(); + final List messages = sqsClient.receiveMessage(request).messages(); + failedAttemptCount = 0; + return messages; + } catch (final SqsException | StsException e) { + LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage()); + applyBackoff(); + return Collections.emptyList(); + } + } + + private void applyBackoff() { + final long delayMillis = standardBackoff.nextDelayMillis(++failedAttemptCount); + if (delayMillis < 0) { + Thread.currentThread().interrupt(); + throw new SqsRetriesExhaustedException("SQS retries exhausted. Make sure that SQS configuration is valid, SQS queue exists, and IAM role has required permissions."); + } + final Duration delayDuration = Duration.ofMillis(delayMillis); + LOG.info("Pausing SQS processing for {}.{} seconds due to an error in processing.", + delayDuration.getSeconds(), delayDuration.toMillisPart()); + try { + Thread.sleep(delayMillis); + } catch (final InterruptedException e){ + LOG.error("Thread is interrupted while polling SQS with retry.", e); + } + } + + + private ReceiveMessageRequest createReceiveMessageRequest() { + return ReceiveMessageRequest.builder() + .queueUrl(queueConfig.getUrl()) + .maxNumberOfMessages(queueConfig.getMaximumMessages()) + .visibilityTimeout((int) queueConfig.getVisibilityTimeout().getSeconds()) + .waitTimeSeconds((int) queueConfig.getWaitTime().getSeconds()) + .build(); + } + + private List processSqsEvents(final List messages) { + final List deleteMessageBatchRequestEntryCollection = new ArrayList<>(); + final Map messageAcknowledgementSetMap = new HashMap<>(); + final Map> messageWaitingForAcknowledgementsMap = new HashMap<>(); + + for (Message message : messages) { + List waitingForAcknowledgements = new ArrayList<>(); + AcknowledgementSet acknowledgementSet = null; + final int visibilityTimeout = (int)queueConfig.getVisibilityTimeout().getSeconds(); + final int maxVisibilityTimeout = (int)queueConfig.getVisibilityDuplicateProtectionTimeout().getSeconds(); + final int progressCheckInterval = visibilityTimeout/2 - 1; + if (endToEndAcknowledgementsEnabled) { + int expiryTimeout = visibilityTimeout - 2; + final boolean visibilityDuplicateProtectionEnabled = queueConfig.getVisibilityDuplicateProtection(); + if (visibilityDuplicateProtectionEnabled) { + expiryTimeout = maxVisibilityTimeout; + } + acknowledgementSet = acknowledgementSetManager.create( + (result) -> { + acknowledgementSetCallbackCounter.increment(); + // Delete only if this is positive acknowledgement + if (visibilityDuplicateProtectionEnabled) { + messageVisibilityTimesMap.remove(message); + } + if (result == true) { + deleteSqsMessages(waitingForAcknowledgements); + } + }, + Duration.ofSeconds(expiryTimeout)); + if (visibilityDuplicateProtectionEnabled) { + acknowledgementSet.addProgressCheck( + (ratio) -> { + final int newVisibilityTimeoutSeconds = visibilityTimeout; + int newValue = messageVisibilityTimesMap.getOrDefault(message, visibilityTimeout) + progressCheckInterval; + if (newValue >= maxVisibilityTimeout) { + return; + } + messageVisibilityTimesMap.put(message, newValue); + increaseVisibilityTimeout(message, newVisibilityTimeoutSeconds); + }, + Duration.ofSeconds(progressCheckInterval)); + } + messageAcknowledgementSetMap.put(message, acknowledgementSet); + messageWaitingForAcknowledgementsMap.put(message, waitingForAcknowledgements); + } + } + + if (endToEndAcknowledgementsEnabled) { + LOG.debug("Created acknowledgement sets for {} messages.", messages.size()); + } + + for (Message message : messages) { + final AcknowledgementSet acknowledgementSet = messageAcknowledgementSetMap.get(message); + final List waitingForAcknowledgements = messageWaitingForAcknowledgementsMap.get(message); + final Optional deleteMessageBatchRequestEntry = processSqsObject(message, acknowledgementSet); + if (endToEndAcknowledgementsEnabled) { + deleteMessageBatchRequestEntry.ifPresent(waitingForAcknowledgements::add); + acknowledgementSet.complete(); + } else { + deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add); + } + } + + return deleteMessageBatchRequestEntryCollection; + } + + + private Optional processSqsObject( + final Message message, + final AcknowledgementSet acknowledgementSet) { + try { + sqsEventProcessor.addSqsObject(message, bufferAccumulator, acknowledgementSet); + // TODO + return Optional.of(buildDeleteMessageBatchRequestEntry(message)); + } catch (final Exception e) { + LOG.error("Error processing from S3: {}. Retrying with exponential backoff.", e.getMessage()); + applyBackoff(); + return Optional.empty(); + } + } + + private void increaseVisibilityTimeout(final Message message, final int newVisibilityTimeoutSeconds) { + if(isStopped) { + LOG.info("Some messages are pending completion of acknowledgments. Data Prepper will not increase the visibility timeout because it is shutting down. {}", message); + return; + } + final ChangeMessageVisibilityRequest changeMessageVisibilityRequest = ChangeMessageVisibilityRequest.builder() + .visibilityTimeout(newVisibilityTimeoutSeconds) + .queueUrl(queueConfig.getUrl()) + .receiptHandle(message.receiptHandle()) + .build(); + + try { + sqsClient.changeMessageVisibility(changeMessageVisibilityRequest); + sqsVisibilityTimeoutChangedCount.increment(); + LOG.debug("Set visibility timeout for message {} to {}", message.messageId(), newVisibilityTimeoutSeconds); + } catch (Exception e) { + LOG.error("Failed to set visibility timeout for message {} to {}", message.messageId(), newVisibilityTimeoutSeconds, e); + sqsVisibilityTimeoutChangeFailedCount.increment(); + } + } + + + private DeleteMessageBatchRequestEntry buildDeleteMessageBatchRequestEntry(Message message) { + return DeleteMessageBatchRequestEntry.builder() + .id(message.messageId()) + .receiptHandle(message.receiptHandle()) + .build(); + } + + private void deleteSqsMessages(final List deleteEntries) { + if (deleteEntries.isEmpty()) return; + + try { + DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder() + .queueUrl(queueConfig.getUrl()) + .entries(deleteEntries) + .build(); + DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(deleteRequest); + + if (response.hasSuccessful()) { + int successfulDeletes = response.successful().size(); + sqsMessagesDeletedCounter.increment(successfulDeletes); + } + if (response.hasFailed()) { + int failedDeletes = response.failed().size(); + sqsMessagesDeleteFailedCounter.increment(failedDeletes); + LOG.error("Failed to delete {} messages from SQS.", failedDeletes); + } + } catch (SdkException e) { + LOG.error("Failed to delete messages from SQS: {}", e.getMessage()); + sqsMessagesDeleteFailedCounter.increment(deleteEntries.size()); + } + } + + void stop() { + isStopped = true; + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index f8fc52a4d3..2c148c59d7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -166,7 +166,7 @@ include 'data-prepper-plugins:obfuscate-processor' include 'data-prepper-plugins:parquet-codecs' include 'data-prepper-plugins:aws-sqs-common' include 'data-prepper-plugins:buffer-common' -//include 'data-prepper-plugins:sqs-source' +include 'data-prepper-plugins:sqs-source' //include 'data-prepper-plugins:cloudwatch-logs' //include 'data-prepper-plugins:http-sink' //include 'data-prepper-plugins:sns-sink' @@ -189,4 +189,5 @@ include 'data-prepper-plugins:opensearch-api-source' include 'data-prepper-plugins:saas-source-plugins' include 'data-prepper-plugins:saas-source-plugins:source-crawler' include 'data-prepper-plugins:saas-source-plugins:jira-source' +include 'data-prepper-plugins:sqs-source-new'