diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java new file mode 100644 index 0000000000..0127a4fadb --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsDispatcher; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsService; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.exception.InvalidBufferTypeException; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; + +import java.util.Collection; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +@DataPrepperPlugin(name = "cloudwatch_logs", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class) +public class CloudWatchLogsSink extends AbstractSink> { + private final CloudWatchLogsService cloudWatchLogsService; + private volatile boolean isInitialized; + @DataPrepperPluginConstructor + public CloudWatchLogsSink(final PluginSetting pluginSetting, + final PluginMetrics pluginMetrics, + final CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig, + final AwsCredentialsSupplier awsCredentialsSupplier) { + super(pluginSetting); + + AwsConfig awsConfig = cloudWatchLogsSinkConfig.getAwsConfig(); + ThresholdConfig thresholdConfig = cloudWatchLogsSinkConfig.getThresholdConfig(); + + CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics); + CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), + thresholdConfig.getMaxEventSizeBytes(), + thresholdConfig.getMaxRequestSize(),thresholdConfig.getLogSendInterval()); + + CloudWatchLogsClient cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier); + + BufferFactory bufferFactory = null; + if (cloudWatchLogsSinkConfig.getBufferType().equals("in_memory")) { + bufferFactory = new InMemoryBufferFactory(); + } + + Executor executor = Executors.newCachedThreadPool(); + + CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder() + .cloudWatchLogsClient(cloudWatchLogsClient) + .cloudWatchLogsMetrics(cloudWatchLogsMetrics) + .logGroup(cloudWatchLogsSinkConfig.getLogGroup()) + .logStream(cloudWatchLogsSinkConfig.getLogStream()) + .backOffTimeBase(thresholdConfig.getBackOffTime()) + .retryCount(thresholdConfig.getRetryCount()) + .executor(executor) + .build(); + + Buffer buffer; + try { + buffer = bufferFactory.getBuffer(); + } catch (NullPointerException e) { + throw new InvalidBufferTypeException("Error loading buffer!"); + } + + cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsLimits, cloudWatchLogsDispatcher); + } + + @Override + public void doInitialize() { + isInitialized = Boolean.TRUE; + } + + @Override + public void doOutput(Collection> records) { + if (records.isEmpty()) { + return; + } + + cloudWatchLogsService.processLogEvents(records); + } + + @Override + public boolean isReady() { + return isInitialized; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/Buffer.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/Buffer.java similarity index 92% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/Buffer.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/Buffer.java index 3c3ccf9f77..1de3c31633 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/Buffer.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/Buffer.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.buffer; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer; import java.util.List; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/BufferFactory.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/BufferFactory.java similarity index 78% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/BufferFactory.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/BufferFactory.java index 574912e5e9..c87fcf230c 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/BufferFactory.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/BufferFactory.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.buffer; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer; /** * BufferFactory will act as a means for decoupling the rest of diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBuffer.java similarity index 94% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBuffer.java index 8716915ee7..855efad146 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBuffer.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBuffer.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.buffer; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer; import java.util.ArrayList; import java.util.Collections; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferFactory.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBufferFactory.java similarity index 76% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferFactory.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBufferFactory.java index 2253931af0..3aa9a12a71 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferFactory.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBufferFactory.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.buffer; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer; public class InMemoryBufferFactory implements BufferFactory{ @Override diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactory.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactory.java similarity index 93% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactory.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactory.java index fbf3d83ba6..925fc2c46a 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactory.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactory.java @@ -3,11 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.client; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.plugins.sink.config.AwsConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java similarity index 98% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java index 58de7c7068..102b511f14 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.client; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import lombok.Builder; import org.opensearch.dataprepper.model.event.EventHandle; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsMetrics.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java similarity index 96% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsMetrics.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java index bee439e855..917aae3646 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsMetrics.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.client; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.metrics.PluginMetrics; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java similarity index 92% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java index 38d1b32f9a..a27d9ea4cb 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsService.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java @@ -3,14 +3,14 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.client; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.sink.buffer.Buffer; -import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits; -import org.opensearch.dataprepper.plugins.sink.utils.SinkStopWatch; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.SinkStopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/AwsConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/AwsConfig.java similarity index 95% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/AwsConfig.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/AwsConfig.java index cb1cb3ae5f..825ed85fc6 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/AwsConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/AwsConfig.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.config; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Size; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java similarity index 94% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/CloudWatchLogsSinkConfig.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index 6d9088d4e2..3cade26ae9 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.config; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/ThresholdConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java similarity index 97% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/ThresholdConfig.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java index b244b9a9da..8b974cb81b 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/ThresholdConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.config; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Size; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/exception/InvalidBufferTypeException.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/exception/InvalidBufferTypeException.java new file mode 100644 index 0000000000..50d75ecd29 --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/exception/InvalidBufferTypeException.java @@ -0,0 +1,7 @@ +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.exception; + +public class InvalidBufferTypeException extends RuntimeException { + public InvalidBufferTypeException(String message) { + super(message); + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/CloudWatchLogsLimits.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimits.java similarity index 98% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/CloudWatchLogsLimits.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimits.java index f7f382dd8d..073daba7fd 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/CloudWatchLogsLimits.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimits.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.utils; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils; /** * ThresholdCheck receives parameters for which to reference the * limits of a buffer and CloudWatchLogsClient before making a diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/SinkStopWatch.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/SinkStopWatch.java similarity index 94% rename from data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/SinkStopWatch.java rename to data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/SinkStopWatch.java index 51b77a1ab6..e03fd22166 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/utils/SinkStopWatch.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/SinkStopWatch.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.utils; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils; import org.apache.commons.lang3.time.StopWatch; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java new file mode 100644 index 0000000000..99c454ae57 --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; + +import java.util.ArrayList; +import java.util.Collection; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class CloudWatchLogsSinkTest { + private PluginSetting mockPluginSetting; + private PluginMetrics mockPluginMetrics; + private CloudWatchLogsSinkConfig mockCloudWatchLogsSinkConfig; + private AwsCredentialsSupplier mockCredentialSupplier; + private AwsConfig mockAwsConfig; + private ThresholdConfig thresholdConfig; + private CloudWatchLogsMetrics mockCloudWatchLogsMetrics; + private CloudWatchLogsClient mockClient; + private static final String TEST_LOG_GROUP = "testLogGroup"; + private static final String TEST_LOG_STREAM= "testLogStream"; + private static final String TEST_PLUGIN_NAME = "testPluginName"; + private static final String TEST_PIPELINE_NAME = "testPipelineName"; + private static final String TEST_BUFFER_TYPE = "in_memory"; + @BeforeEach + void setUp() { + mockPluginSetting = mock(PluginSetting.class); + mockPluginMetrics = mock(PluginMetrics.class); + mockCloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class); + mockCredentialSupplier = mock(AwsCredentialsSupplier.class); + mockAwsConfig = mock(AwsConfig.class); + thresholdConfig = new ThresholdConfig(); + mockCloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class); + mockClient = mock(CloudWatchLogsClient.class); + + when(mockCloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(mockAwsConfig); + when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig); + when(mockCloudWatchLogsSinkConfig.getLogGroup()).thenReturn(TEST_LOG_GROUP); + when(mockCloudWatchLogsSinkConfig.getLogStream()).thenReturn(TEST_LOG_STREAM); + when(mockCloudWatchLogsSinkConfig.getBufferType()).thenReturn(TEST_BUFFER_TYPE); + + when(mockPluginSetting.getName()).thenReturn(TEST_PLUGIN_NAME); + when(mockPluginSetting.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + } + + CloudWatchLogsSink getTestCloudWatchSink() { + return new CloudWatchLogsSink(mockPluginSetting, mockPluginMetrics, mockCloudWatchLogsSinkConfig, + mockCredentialSupplier); + } + + Collection> getMockedRecords() { + Collection> testCollection = new ArrayList<>(); + Record mockedEvent = new Record<>(JacksonEvent.fromMessage("")); + Record spyEvent = spy(mockedEvent); + + testCollection.add(spyEvent); + + return testCollection; + } + + @Test + void WHEN_sink_is_initialized_THEN_sink_is_ready_returns_true() { + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class))) + .thenReturn(mockClient); + + CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink(); + testCloudWatchSink.doInitialize(); + assertTrue(testCloudWatchSink.isReady()); + } + } + + @Test + void WHEN_given_sample_empty_records_THEN_records_are_processed() { + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class))) + .thenReturn(mockClient); + + CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink(); + testCloudWatchSink.doInitialize(); + Collection> spyEvents = getMockedRecords(); + + testCloudWatchSink.doOutput(spyEvents); + + for (Record spyEvent : spyEvents) { + verify(spyEvent, atLeast(1)).getData(); + } + } + } + + @Test + void WHEN_given_sample_empty_records_THEN_records_are_not_processed() { + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class))) + .thenReturn(mockClient); + + CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink(); + testCloudWatchSink.doInitialize(); + Collection> spyEvents = spy(ArrayList.class); + + assertTrue(spyEvents.isEmpty()); + + testCloudWatchSink.doOutput(spyEvents); + verify(spyEvents, times(2)).isEmpty(); + } + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferFactoryTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBufferFactoryTest.java similarity index 88% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferFactoryTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBufferFactoryTest.java index 3419788aed..ee53e8ba8f 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferFactoryTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBufferFactoryTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.buffer; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer; import org.junit.jupiter.api.Test; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBufferTest.java similarity index 98% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBufferTest.java index b88b9e7826..dfd031ce74 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/buffer/InMemoryBufferTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/buffer/InMemoryBufferTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.buffer; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactoryTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactoryTest.java similarity index 96% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactoryTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactoryTest.java index 32727254aa..ef822416f4 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsClientFactoryTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactoryTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.client; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -12,7 +12,7 @@ import org.mockito.MockedStatic; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.plugins.sink.config.AwsConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.regions.Region; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java similarity index 93% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcherTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java index 7ca4ae098b..38ff027ac3 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java @@ -3,12 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.client; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; @@ -25,7 +25,7 @@ class CloudWatchLogsDispatcherTest { private CloudWatchLogsDispatcher cloudWatchLogsDispatcher; private CloudWatchLogsClient mockCloudWatchLogsClient; - private CloudWatchLogsMetrics mockCloudWatchLogsMetrics; + private CloudWatchLogsMetrics mockCloudWatchLogsMetrics; private Executor mockExecutor; private static final String LOG_GROUP = "testGroup"; private static final String LOG_STREAM = "testStream"; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsMetricsTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java similarity index 97% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsMetricsTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java index 2e19e2aa54..528034d373 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsMetricsTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.sink.client; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java similarity index 92% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java index a0864d2207..f984a44d89 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.client; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -11,12 +11,12 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.sink.buffer.Buffer; -import org.opensearch.dataprepper.plugins.sink.buffer.InMemoryBuffer; -import org.opensearch.dataprepper.plugins.sink.buffer.InMemoryBufferFactory; -import org.opensearch.dataprepper.plugins.sink.config.CloudWatchLogsSinkConfig; -import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; -import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBuffer; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import java.util.ArrayList; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/UploaderTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/UploaderTest.java similarity index 94% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/UploaderTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/UploaderTest.java index 56703ebc51..19629b2952 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/UploaderTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/UploaderTest.java @@ -1,9 +1,9 @@ -package org.opensearch.dataprepper.plugins.sink.client; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; @@ -21,7 +21,7 @@ class UploaderTest { private CloudWatchLogsClient mockCloudWatchLogsClient; - private CloudWatchLogsMetrics mockCloudWatchLogsMetrics; + private CloudWatchLogsMetrics mockCloudWatchLogsMetrics; @BeforeEach void setUp() { diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/config/AwsConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/AwsConfigTest.java similarity index 98% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/config/AwsConfigTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/AwsConfigTest.java index edd379641c..e420e019e0 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/config/AwsConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/AwsConfigTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.config; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.BeforeEach; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/config/CloudWatchLogsSinkConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java similarity index 97% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/config/CloudWatchLogsSinkConfigTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java index 77ff49db8a..4158b15e7b 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/config/CloudWatchLogsSinkConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.config; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/config/ThresholdConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java similarity index 98% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/config/ThresholdConfigTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java index 59f8721d05..a636e68c33 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/config/ThresholdConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfigTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.config; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.BeforeEach; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/utils/CloudWatchLogsLimitsTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimitsTest.java similarity index 98% rename from data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/utils/CloudWatchLogsLimitsTest.java rename to data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimitsTest.java index 1d249ccbad..bae3b69bea 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/utils/CloudWatchLogsLimitsTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsLimitsTest.java @@ -3,11 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.utils; +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/data-prepper-plugins/common/build.gradle b/data-prepper-plugins/common/build.gradle index 9a2041e5c6..43b9a80924 100644 --- a/data-prepper-plugins/common/build.gradle +++ b/data-prepper-plugins/common/build.gradle @@ -21,7 +21,7 @@ dependencies { implementation 'io.micrometer:micrometer-core' testImplementation testLibs.junit.vintage implementation 'org.apache.parquet:parquet-common:1.12.3' - implementation 'org.xerial.snappy:snappy-java:1.1.10.1' + implementation 'org.xerial.snappy:snappy-java:1.1.10.3' testImplementation project(':data-prepper-plugins:blocking-buffer') testImplementation 'commons-io:commons-io:2.12.0' testImplementation testLibs.mockito.inline diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java index f0d2dfb27e..6e7f612534 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java @@ -118,7 +118,7 @@ public void setup() { when(jsonTopic.getName()).thenReturn(testTopic); when(jsonTopic.getGroupId()).thenReturn(testGroup); when(jsonTopic.getWorkers()).thenReturn(1); - when(jsonTopic.getSessionTimeOut()).thenReturn(15000); + when(jsonTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); when(jsonTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); when(jsonTopic.getAutoCommit()).thenReturn(false); when(jsonTopic.getSerdeFormat()).thenReturn(MessageFormat.JSON); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java index 6c49d3b2c3..6d52bba0ea 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceMultipleAuthTypeIT.java @@ -130,7 +130,7 @@ public void setup() { when(plainTextTopic.getName()).thenReturn(testTopic); when(plainTextTopic.getGroupId()).thenReturn(testGroup); when(plainTextTopic.getWorkers()).thenReturn(1); - when(plainTextTopic.getSessionTimeOut()).thenReturn(15000); + when(plainTextTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); when(plainTextTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); when(plainTextTopic.getAutoCommit()).thenReturn(false); when(plainTextTopic.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java index 9be6900a68..9eb222c496 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/MskGlueRegistryMultiTypeIT.java @@ -169,7 +169,7 @@ public void setup() { when(avroTopic.getAutoCommit()).thenReturn(false); when(avroTopic.getAutoOffsetReset()).thenReturn("earliest"); when(avroTopic.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); - when(avroTopic.getSessionTimeOut()).thenReturn(15000); + when(avroTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(5)); when(avroTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); when(jsonTopic.getName()).thenReturn(testTopic); when(jsonTopic.getGroupId()).thenReturn(testGroup); @@ -177,7 +177,6 @@ public void setup() { when(jsonTopic.getAutoCommit()).thenReturn(false); when(jsonTopic.getAutoOffsetReset()).thenReturn("earliest"); when(jsonTopic.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); - when(jsonTopic.getSessionTimeOut()).thenReturn(15000); when(jsonTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers"); testRegistryName = System.getProperty("tests.kafka.glue_registry_name"); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfig.java index ce413f1c10..b122e73d23 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfig.java @@ -19,13 +19,6 @@ public class PlainTextAuthConfig { @JsonProperty("password") private String password; - @JsonProperty("security_protocol") - private String securityProtocol; - - public String getSecurityProtocol() { - return securityProtocol; - } - public String getUsername() { return username; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java index e61344ec62..2f3ab61fcb 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java @@ -19,10 +19,10 @@ */ public class TopicConfig { private static final String AUTO_COMMIT = "false"; - private static final Duration AUTOCOMMIT_INTERVAL = Duration.ofSeconds(5); - private static final Integer SESSION_TIMEOUT = 45000; + private static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5); + private static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); private static final int MAX_RETRY_ATTEMPT = Integer.MAX_VALUE; - private static final String AUTO_OFFSET_RESET = "earliest"; + static final String DEFAULT_AUTO_OFFSET_RESET = "latest"; static final Duration THREAD_WAITING_TIME = Duration.ofSeconds(5); private static final Duration MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4); private static final Duration BUFFER_DEFAULT_TIMEOUT = Duration.ofSeconds(5); @@ -33,8 +33,8 @@ public class TopicConfig { private static final Duration RETRY_BACKOFF = Duration.ofSeconds(100); private static final Duration MAX_POLL_INTERVAL = Duration.ofSeconds(300000); private static final Integer CONSUMER_MAX_POLL_RECORDS = 500; - private static final Integer NUM_OF_WORKERS = 5; - private static final Duration HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(3); + static final Integer DEFAULT_NUM_OF_WORKERS = 2; + static final Duration HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5); @JsonProperty("name") @NotNull @@ -49,7 +49,7 @@ public class TopicConfig { @JsonProperty("workers") @Valid @Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200") - private Integer workers = NUM_OF_WORKERS; + private Integer workers = DEFAULT_NUM_OF_WORKERS; @JsonProperty("max_retry_attempts") @Valid @@ -67,18 +67,18 @@ public class TopicConfig { @JsonProperty("auto_commit") private Boolean autoCommit = false; - @JsonProperty("auto_commit_interval") + @JsonProperty("commit_interval") @Valid @Size(min = 1) - private Duration autoCommitInterval = AUTOCOMMIT_INTERVAL; + private Duration commitInterval = DEFAULT_COMMIT_INTERVAL; @JsonProperty("session_timeout") @Valid @Size(min = 1) - private Integer sessionTimeOut = SESSION_TIMEOUT; + private Duration sessionTimeOut = DEFAULT_SESSION_TIMEOUT; @JsonProperty("auto_offset_reset") - private String autoOffsetReset = AUTO_OFFSET_RESET; + private String autoOffsetReset = DEFAULT_AUTO_OFFSET_RESET; @JsonProperty("group_name") @Valid @@ -148,15 +148,15 @@ public Boolean getAutoCommit() { return autoCommit; } - public Duration getAutoCommitInterval() { - return autoCommitInterval; + public Duration getCommitInterval() { + return commitInterval; } - public void setAutoCommitInterval(Duration autoCommitInterval) { - this.autoCommitInterval = autoCommitInterval; + public void setCommitInterval(Duration commitInterval) { + this.commitInterval = commitInterval; } - public Integer getSessionTimeOut() { + public Duration getSessionTimeOut() { return sessionTimeOut; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 919756af04..417518836b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -107,7 +107,7 @@ public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAn if (Objects.isNull(offsetAndMetadata)) { return; } - synchronized (this) { + synchronized (offsetsToCommit) { offsetsToCommit.put(partition, offsetAndMetadata); } } @@ -118,18 +118,34 @@ private AcknowledgementSet createAcknowledgementSet(Map { - int partitionId = partition.partition(); - if (!partitionCommitTrackerMap.containsKey(partitionId)) { - OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); - Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null; - - partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset)); + try { + int partitionId = partition.partition(); + if (!partitionCommitTrackerMap.containsKey(partitionId)) { + OffsetAndMetadata committedOffsetAndMetadata = null; + synchronized(consumer) { + committedOffsetAndMetadata = consumer.committed(partition); + } + Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null; + partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset)); + } + OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); + updateOffsetsToCommit(partition, offsetAndMetadata); + } catch (Exception e) { + LOG.error("Failed to seek to last committed offset upon positive acknowledgement "+partition, e); } - OffsetAndMetadata offsetAndMetadata = partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange); - updateOffsetsToCommit(partition, offsetAndMetadata); }); } else { - positiveAcknowledgementSetCounter.increment(); + negativeAcknowledgementSetCounter.increment(); + offsets.forEach((partition, offsetRange) -> { + try { + synchronized(consumer) { + OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition); + consumer.seek(partition, committedOffsetAndMetadata); + } + } catch (Exception e) { + LOG.error("Failed to seek to last committed offset upon negative acknowledgement "+partition, e); + } + }); } }, acknowledgementsTimeout); return acknowledgementSet; @@ -141,9 +157,11 @@ private AcknowledgementSet createAcknowledgementSet(Map void consumeRecords() throws Exception { try { - ConsumerRecords records = - consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); - if (!records.isEmpty() && records.count() > 0) { + ConsumerRecords records = null; + synchronized(consumer) { + records = consumer.poll(topicConfig.getThreadWaitingTime().toMillis()/2); + } + if (Objects.nonNull(records) && !records.isEmpty() && records.count() > 0) { Map> offsets = new HashMap<>(); AcknowledgementSet acknowledgementSet = null; if (acknowledgementsEnabled) { @@ -168,15 +186,17 @@ private void commitOffsets() { return; } long currentTimeMillis = System.currentTimeMillis(); - if ((currentTimeMillis - lastCommitTime) < COMMIT_OFFSET_INTERVAL_MS) { + if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) { return; } - synchronized (this) { + synchronized (offsetsToCommit) { if (offsetsToCommit.isEmpty()) { return; } try { - consumer.commitSync(); + synchronized(consumer) { + consumer.commitSync(); + } offsetsToCommit.clear(); lastCommitTime = currentTimeMillis; } catch (CommitFailedException e) { @@ -286,8 +306,10 @@ public void shutdownConsumer(){ @Override public void onPartitionsAssigned(Collection partitions) { for (TopicPartition topicPartition : partitions) { - Long committedOffset = consumer.committed(topicPartition).offset(); - consumer.seek(topicPartition, committedOffset); + synchronized(consumer) { + Long committedOffset = consumer.committed(topicPartition).offset(); + consumer.seek(topicPartition, committedOffset); + } } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 662b2c8212..45f5a87446 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -118,7 +118,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, @Override public void start(Buffer> buffer) { sourceConfig.getTopics().forEach(topic -> { - consumerGroupID = getGroupId(topic.getName()); + consumerGroupID = topic.getGroupId(); Properties consumerProperties = getConsumerProperties(topic); MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); try { @@ -175,10 +175,6 @@ public void stop() { LOG.info("Consumer shutdown successfully..."); } - private String getGroupId(String name) { - return pipelineName + "::" + name; - } - private long calculateLongestThreadWaitingTime() { List topicsList = sourceConfig.getTopics(); return topicsList.stream(). @@ -368,13 +364,13 @@ private void setConsumerTopicProperties(Properties properties, TopicConfig topic properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, topicConfig.getAutoCommit()); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, - topicConfig.getAutoCommitInterval().toSecondsPart()); + ((Long)topicConfig.getCommitInterval().toMillis()).intValue()); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, topicConfig.getAutoOffsetReset()); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, topicConfig.getConsumerMaxPollRecords()); - properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, topicConfig.getSessionTimeOut()); - properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, topicConfig.getHeartBeatInterval().toSecondsPart()); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue()); + properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue()); properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, topicConfig.getFetchMaxBytes().intValue()); properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait()); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulator.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulator.java deleted file mode 100644 index 826bc39d61..0000000000 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulator.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.kafka.source; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.micrometer.core.instrument.Counter; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.log.JacksonLog; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - - -/** - * * A helper utility class which helps to write different formats of records - * like json, avro and plaintext to the buffer. - */ -@SuppressWarnings("deprecation") -public class KafkaSourceBufferAccumulator { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceBufferAccumulator.class); - private static final String MESSAGE_KEY = "message"; - private final TopicConfig topicConfig; - private final KafkaSourceConfig kafkaSourceConfig; - private final String schemaType; - private PluginMetrics pluginMetrics; - private final Counter kafkaConsumerWriteError; - private static final String KAFKA_CONSUMER_BUFFER_WRITE_ERROR = "kafkaConsumerBufferWriteError"; - private static final int MAX_FLUSH_RETRIES_ON_IO_EXCEPTION = Integer.MAX_VALUE; - private static final Duration INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION = Duration.ofSeconds(5); - private final JsonFactory jsonFactory = new JsonFactory(); - private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final Long COMMIT_OFFSET_INTERVAL_MILLI_SEC = 300000L; - - public KafkaSourceBufferAccumulator(final TopicConfig topicConfigs, - final KafkaSourceConfig kafkaSourceConfig, - final String schemaType, PluginMetrics pluginMetric) { - this.kafkaSourceConfig = kafkaSourceConfig; - this.topicConfig = topicConfigs; - this.schemaType = schemaType; - this.pluginMetrics = pluginMetric; - this.kafkaConsumerWriteError = pluginMetrics.counter(KAFKA_CONSUMER_BUFFER_WRITE_ERROR); - } - - public Record getEventRecord(final String line) { - Map message = new HashMap<>(); - MessageFormat format = MessageFormat.getByMessageFormatByName(schemaType); - if (format.equals(MessageFormat.JSON) || format.equals(MessageFormat.AVRO)) { - try { - final JsonParser jsonParser = jsonFactory.createParser(line); - message = objectMapper.readValue(jsonParser, Map.class); - } catch (Exception e) { - LOG.error("Unable to parse json data [{}]", line, e); - message.put(MESSAGE_KEY, line); - } - } else{ - message.put(MESSAGE_KEY, line); - } - Event event = JacksonLog.builder().withData(message).build(); - return new Record<>(event); - } - - public void write(List> kafkaRecords, final Buffer> buffer) throws Exception { - try { - writeAllRecordToBuffer(kafkaRecords, - buffer, topicConfig); - } catch (Exception e) { - if (canRetry(e)) { - writeWithBackoff(kafkaRecords, buffer, topicConfig); - } - LOG.error("Error occurred while writing data to the buffer {}", e.getMessage()); - kafkaConsumerWriteError.increment(); - } - } - - public synchronized void writeAllRecordToBuffer(List> kafkaRecords, final Buffer> buffer, final TopicConfig topicConfig) throws Exception { - buffer.writeAll(kafkaRecords, - topicConfig.getBufferDefaultTimeout().toSecondsPart()); - } - - public boolean canRetry(final Exception e) { - return (e instanceof IOException || e instanceof TimeoutException || e instanceof ExecutionException - || e instanceof InterruptedException); - } - - public boolean writeWithBackoff(List> kafkaRecords, final Buffer> buffer, final TopicConfig topicConfig) throws Exception { - final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - long nextDelay = INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION.toMillis(); - boolean flushedSuccessfully; - - for (int retryCount = 0; retryCount < MAX_FLUSH_RETRIES_ON_IO_EXCEPTION; retryCount++) { - final ScheduledFuture flushBufferFuture = scheduledExecutorService.schedule(() -> { - try { - writeAllRecordToBuffer(kafkaRecords, buffer, topicConfig); - return true; - } catch (final TimeoutException e) { - return false; - } - }, nextDelay, TimeUnit.MILLISECONDS); - - try { - flushedSuccessfully = flushBufferFuture.get(); - if (flushedSuccessfully) { - LOG.info("Successfully flushed the buffer accumulator on retry attempt {}", retryCount + 1); - scheduledExecutorService.shutdownNow(); - return true; - } - } catch (final ExecutionException exp) { - LOG.warn("Retrying of flushing the buffer accumulator hit an exception: {}", exp); - scheduledExecutorService.shutdownNow(); - throw exp; - } catch (final InterruptedException exp) { - LOG.warn("Retrying of flushing the buffer accumulator was interrupted: {}", exp); - scheduledExecutorService.shutdownNow(); - throw exp; - } - } - LOG.warn("Flushing the bufferAccumulator failed after {} attempts", MAX_FLUSH_RETRIES_ON_IO_EXCEPTION); - scheduledExecutorService.shutdownNow(); - return false; - } - - public long commitOffsets(KafkaConsumer consumer, long lastCommitTime, Map offsetsToCommit) { - try { - long currentTimeMillis = System.currentTimeMillis(); - if (currentTimeMillis - lastCommitTime > COMMIT_OFFSET_INTERVAL_MILLI_SEC) { - if(!offsetsToCommit.isEmpty()) { - consumer.commitSync(offsetsToCommit); - offsetsToCommit.clear(); - LOG.info("Succeeded to commit the offsets ..."); - } - lastCommitTime = currentTimeMillis; - } - } catch (Exception e) { - LOG.error("Failed to commit the offsets...", e); - } - return lastCommitTime; - } - - public long processConsumerRecords(Map offsetsToCommit, - List> kafkaRecords, - long lastReadOffset, ConsumerRecord consumerRecord, List> partitionRecords) { - offsetsToCommit.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), - new OffsetAndMetadata(consumerRecord.offset() + 1, null)); - kafkaRecords.add(getEventRecord(consumerRecord.value())); - lastReadOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); - return lastReadOffset; - } -} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java index 1a095db164..db77e5cfeb 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfigTest.java @@ -68,9 +68,9 @@ void test_topicsConfig_not_null() { void testConfigValues_default() { assertEquals("my-topic-2", topicConfig.getName()); assertEquals(false, topicConfig.getAutoCommit()); - assertEquals(Duration.ofSeconds(5), topicConfig.getAutoCommitInterval()); - assertEquals(45000, topicConfig.getSessionTimeOut()); - assertEquals("earliest", topicConfig.getAutoOffsetReset()); + assertEquals(Duration.ofSeconds(5), topicConfig.getCommitInterval()); + assertEquals(45000, topicConfig.getSessionTimeOut().toMillis()); + assertEquals(TopicConfig.DEFAULT_AUTO_OFFSET_RESET, topicConfig.getAutoOffsetReset()); assertEquals(TopicConfig.THREAD_WAITING_TIME, topicConfig.getThreadWaitingTime()); assertEquals(Duration.ofSeconds(4), topicConfig.getMaxRecordFetchTime()); assertEquals(Duration.ofSeconds(5), topicConfig.getBufferDefaultTimeout()); @@ -80,8 +80,8 @@ void testConfigValues_default() { assertEquals(Duration.ofSeconds(100), topicConfig.getRetryBackoff()); assertEquals(Duration.ofSeconds(300000), topicConfig.getMaxPollInterval()); assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue()); - assertEquals(5, topicConfig.getWorkers().intValue()); - assertEquals(Duration.ofSeconds(3), topicConfig.getHeartBeatInterval()); + assertEquals(TopicConfig.DEFAULT_NUM_OF_WORKERS, topicConfig.getWorkers().intValue()); + assertEquals(TopicConfig.HEART_BEAT_INTERVAL_DURATION, topicConfig.getHeartBeatInterval()); } @Test @@ -89,8 +89,8 @@ void testConfigValues_default() { void testConfigValues_from_yaml() { assertEquals("my-topic-1", topicConfig.getName()); assertEquals(false, topicConfig.getAutoCommit()); - assertEquals(Duration.ofSeconds(5), topicConfig.getAutoCommitInterval()); - assertEquals(45000, topicConfig.getSessionTimeOut()); + assertEquals(Duration.ofSeconds(5), topicConfig.getCommitInterval()); + assertEquals(45000, topicConfig.getSessionTimeOut().toMillis()); assertEquals("earliest", topicConfig.getAutoOffsetReset()); assertEquals(Duration.ofSeconds(1), topicConfig.getThreadWaitingTime()); assertEquals(Duration.ofSeconds(4), topicConfig.getMaxRecordFetchTime()); @@ -110,7 +110,7 @@ void testConfigValues_from_yaml() { void testConfigValues_from_yaml_not_null() { assertNotNull(topicConfig.getName()); assertNotNull(topicConfig.getAutoCommit()); - assertNotNull(topicConfig.getAutoCommitInterval()); + assertNotNull(topicConfig.getCommitInterval()); assertNotNull(topicConfig.getSessionTimeOut()); assertNotNull(topicConfig.getAutoOffsetReset()); assertNotNull(topicConfig.getThreadWaitingTime()); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java index 0882259577..da84536a82 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java @@ -212,6 +212,44 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted }); } + @Test + public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws InterruptedException { + String topic = topicConfig.getName(); + consumerRecords = createPlainTextRecords(topic); + when(kafkaConsumer.poll(anyLong())).thenReturn(consumerRecords); + consumer = createObjectUnderTest("plaintext", true); + + try { + consumer.consumeRecords(); + } catch (Exception e){} + final Map.Entry>, CheckpointState> bufferRecords = buffer.read(1000); + ArrayList> bufferedRecords = new ArrayList<>(bufferRecords.getKey()); + Assertions.assertEquals(consumerRecords.count(), bufferedRecords.size()); + Map offsetsToCommit = consumer.getOffsetsToCommit(); + Assertions.assertEquals(offsetsToCommit.size(), 0); + + for (Record record: bufferedRecords) { + Event event = record.getData(); + String value1 = event.get(testKey1, String.class); + String value2 = event.get(testKey2, String.class); + assertTrue(value1 != null || value2 != null); + if (value1 != null) { + Assertions.assertEquals(value1, testValue1); + } + if (value2 != null) { + Assertions.assertEquals(value2, testValue2); + } + event.getEventHandle().release(false); + } + // Wait for acknowledgement callback function to run + try { + Thread.sleep(10000); + } catch (Exception e){} + + offsetsToCommit = consumer.getOffsetsToCommit(); + Assertions.assertEquals(offsetsToCommit.size(), 0); + } + @Test public void testJsonConsumeRecords() throws InterruptedException, Exception { String topic = topicConfig.getName(); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulatorTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulatorTest.java deleted file mode 100644 index c09e133685..0000000000 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulatorTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.kafka.source; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - - -@SuppressWarnings("deprecation") -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.LENIENT) -class KafkaSourceBufferAccumulatorTest { - - @Mock - private KafkaSourceBufferAccumulator buffer; - - @Mock - private KafkaSourceConfig sourceConfig; - - @Mock - private TopicConfig topicConfig; - @Mock - private KafkaConsumer kafkaConsumer; - - @Mock - List mockList = new ArrayList(); - @Mock - private SchemaConfig schemaConfig; - - @Mock - private PluginMetrics pluginMetrics; - - @Mock - private Buffer> record; - - @Mock - List> kafkaRecords; - - @BeforeEach - void setUp() throws Exception { - when(sourceConfig.getTopics()).thenReturn((mockList)); - when(mockList.get(0)).thenReturn(topicConfig); - when(sourceConfig.getSchemaConfig()).thenReturn(schemaConfig); - - when(sourceConfig.getSchemaConfig()).thenReturn(mock(SchemaConfig.class)); - - buffer = new KafkaSourceBufferAccumulator<>(topicConfig, sourceConfig, "plaintext", pluginMetrics); - } - - @Test - void testWriteEventOrStringToBuffer_plaintext_schemaType() throws Exception { - createObjectWithSchemaType("plaintext"); - - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).getEventRecord("anyString"); - spyBuffer.getEventRecord("anyString"); - verify(spyBuffer).getEventRecord("anyString"); - assertNotNull(spyBuffer.getEventRecord("anyString")); - } - - @Test - void testWriteEventOrStringToBuffer_json_schemaType() throws Exception { - String json = "{\"writebuffer\":\"true\",\"buffertype\":\"json\"}"; - createObjectWithSchemaType("json"); //Added By Mehak - - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).getEventRecord(json); - spyBuffer.getEventRecord(json); - verify(spyBuffer).getEventRecord(json); - assertNotNull(spyBuffer.getEventRecord(json)); - } - - @Test - void testWriteEventOrStringToBuffer_json_schemaType_catch_block() throws Exception { - createObjectWithSchemaType("json"); - - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).getEventRecord("anyString"); - spyBuffer.getEventRecord("anyString"); - verify(spyBuffer).getEventRecord("anyString"); - assertNotNull(spyBuffer.getEventRecord("anyString")); - } - - @Test - void testWriteEventOrStringToBuffer_plaintext_schemaType_catch_block() throws Exception { - createObjectWithSchemaType("plaintext"); - - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).getEventRecord(null); - spyBuffer.getEventRecord(null); - verify(spyBuffer).getEventRecord(null); - assertNotNull(spyBuffer.getEventRecord(null)); - } - - @Test - void testwrite()throws Exception{ - TopicConfig topicConfig = new TopicConfig(); - SchemaConfig schemaConfig = new SchemaConfig(); - topicConfig.setBufferDefaultTimeout(Duration.ofMillis(100)); - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).write(kafkaRecords, record); - spyBuffer.write(kafkaRecords, record); - verify(spyBuffer).write(kafkaRecords, record); - } - - private void createObjectWithSchemaType(String schema){ - - topicConfig = new TopicConfig(); - schemaConfig = new SchemaConfig(); - topicConfig.setBufferDefaultTimeout(Duration.ofMillis(100)); - sourceConfig.setSchemaConfig(schemaConfig); - } - - @Test - void testwriteWithBackoff() throws Exception { - TopicConfig topicConfig = new TopicConfig(); - Buffer> bufferObj = mock(Buffer.class); - topicConfig.setBufferDefaultTimeout(Duration.ofMillis(100)); - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).writeWithBackoff(kafkaRecords, bufferObj, this.topicConfig); - spyBuffer.writeWithBackoff(kafkaRecords, bufferObj, this.topicConfig); - verify(spyBuffer).writeWithBackoff(kafkaRecords, bufferObj, this.topicConfig); - } - - @Test - void testPublishRecordToBuffer_commitOffsets() throws Exception { - topicConfig = new TopicConfig(); - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).commitOffsets(kafkaConsumer, 0L, null); - spyBuffer.commitOffsets(kafkaConsumer, 0L, null); - verify(spyBuffer).commitOffsets(kafkaConsumer, 0L, null); - } -} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index ebc358818b..be868b3e6f 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -71,8 +71,8 @@ void setUp() throws Exception { when(topic2.getName()).thenReturn("topic2"); when(topic1.getWorkers()).thenReturn(2); when(topic2.getWorkers()).thenReturn(3); - when(topic1.getAutoCommitInterval()).thenReturn(Duration.ofSeconds(1)); - when(topic2.getAutoCommitInterval()).thenReturn(Duration.ofSeconds(1)); + when(topic1.getCommitInterval()).thenReturn(Duration.ofSeconds(1)); + when(topic2.getCommitInterval()).thenReturn(Duration.ofSeconds(1)); when(topic1.getAutoOffsetReset()).thenReturn("earliest"); when(topic2.getAutoOffsetReset()).thenReturn("earliest"); when(topic1.getConsumerMaxPollRecords()).thenReturn(1); diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml index 90e0131f09..fe3407471b 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml @@ -9,8 +9,8 @@ log-pipeline: - name: my-topic-1 workers: 5 auto_commit: false - auto_commit_interval: PT5S - session_timeout: 45000 + commit_interval: PT5S + session_timeout: PT45S max_retry_attempts: 1000 auto_offset_reset: earliest thread_waiting_time: PT1S @@ -39,7 +39,6 @@ log-pipeline: sasl: aws_msk_iam: role plaintext: - security_protocol: SASL_SSL username: 5UH4NID4OENKDIBI password: jCmncn77F9asfox3yhgZLCEwQ5fx8pKiXnszMqdt0y1GLrdZO1V1iz95aIe1UubX oauth: