Skip to content

Commit

Permalink
Merge branch 'sns-sink-plugin' of [email protected]:udaych20/data-preppe…
Browse files Browse the repository at this point in the history
…r.git into sns-sink-plugin
  • Loading branch information
udaych20 committed Aug 2, 2023
2 parents 3ba60d7 + 67f10d9 commit 3a19a4e
Show file tree
Hide file tree
Showing 41 changed files with 397 additions and 440 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs;

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsDispatcher;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsService;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.exception.InvalidBufferTypeException;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@DataPrepperPlugin(name = "cloudwatch_logs", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class)
public class CloudWatchLogsSink extends AbstractSink<Record<Event>> {
private final CloudWatchLogsService cloudWatchLogsService;
private volatile boolean isInitialized;
@DataPrepperPluginConstructor
public CloudWatchLogsSink(final PluginSetting pluginSetting,
final PluginMetrics pluginMetrics,
final CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig,
final AwsCredentialsSupplier awsCredentialsSupplier) {
super(pluginSetting);

AwsConfig awsConfig = cloudWatchLogsSinkConfig.getAwsConfig();
ThresholdConfig thresholdConfig = cloudWatchLogsSinkConfig.getThresholdConfig();

CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics);
CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(),
thresholdConfig.getMaxEventSizeBytes(),
thresholdConfig.getMaxRequestSize(),thresholdConfig.getLogSendInterval());

CloudWatchLogsClient cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier);

BufferFactory bufferFactory = null;
if (cloudWatchLogsSinkConfig.getBufferType().equals("in_memory")) {
bufferFactory = new InMemoryBufferFactory();
}

Executor executor = Executors.newCachedThreadPool();

CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder()
.cloudWatchLogsClient(cloudWatchLogsClient)
.cloudWatchLogsMetrics(cloudWatchLogsMetrics)
.logGroup(cloudWatchLogsSinkConfig.getLogGroup())
.logStream(cloudWatchLogsSinkConfig.getLogStream())
.backOffTimeBase(thresholdConfig.getBackOffTime())
.retryCount(thresholdConfig.getRetryCount())
.executor(executor)
.build();

Buffer buffer;
try {
buffer = bufferFactory.getBuffer();
} catch (NullPointerException e) {
throw new InvalidBufferTypeException("Error loading buffer!");
}

cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsLimits, cloudWatchLogsDispatcher);
}

@Override
public void doInitialize() {
isInitialized = Boolean.TRUE;
}

@Override
public void doOutput(Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
}

cloudWatchLogsService.processLogEvents(records);
}

@Override
public boolean isReady() {
return isInitialized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer;

/**
* BufferFactory will act as a means for decoupling the rest of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer;

import java.util.ArrayList;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer;

public class InMemoryBufferFactory implements BufferFactory{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client;

import lombok.Builder;
import org.opensearch.dataprepper.model.event.EventHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.sink.buffer.Buffer;
import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits;
import org.opensearch.dataprepper.plugins.sink.utils.SinkStopWatch;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.SinkStopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.config;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.config;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.config;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.exception;

public class InvalidBufferTypeException extends RuntimeException {
public InvalidBufferTypeException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.utils;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils;
/**
* ThresholdCheck receives parameters for which to reference the
* limits of a buffer and CloudWatchLogsClient before making a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.utils;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils;

import org.apache.commons.lang3.time.StopWatch;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

import java.util.ArrayList;
import java.util.Collection;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class CloudWatchLogsSinkTest {
private PluginSetting mockPluginSetting;
private PluginMetrics mockPluginMetrics;
private CloudWatchLogsSinkConfig mockCloudWatchLogsSinkConfig;
private AwsCredentialsSupplier mockCredentialSupplier;
private AwsConfig mockAwsConfig;
private ThresholdConfig thresholdConfig;
private CloudWatchLogsMetrics mockCloudWatchLogsMetrics;
private CloudWatchLogsClient mockClient;
private static final String TEST_LOG_GROUP = "testLogGroup";
private static final String TEST_LOG_STREAM= "testLogStream";
private static final String TEST_PLUGIN_NAME = "testPluginName";
private static final String TEST_PIPELINE_NAME = "testPipelineName";
private static final String TEST_BUFFER_TYPE = "in_memory";
@BeforeEach
void setUp() {
mockPluginSetting = mock(PluginSetting.class);
mockPluginMetrics = mock(PluginMetrics.class);
mockCloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class);
mockCredentialSupplier = mock(AwsCredentialsSupplier.class);
mockAwsConfig = mock(AwsConfig.class);
thresholdConfig = new ThresholdConfig();
mockCloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class);
mockClient = mock(CloudWatchLogsClient.class);

when(mockCloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(mockAwsConfig);
when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
when(mockCloudWatchLogsSinkConfig.getLogGroup()).thenReturn(TEST_LOG_GROUP);
when(mockCloudWatchLogsSinkConfig.getLogStream()).thenReturn(TEST_LOG_STREAM);
when(mockCloudWatchLogsSinkConfig.getBufferType()).thenReturn(TEST_BUFFER_TYPE);

when(mockPluginSetting.getName()).thenReturn(TEST_PLUGIN_NAME);
when(mockPluginSetting.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
}

CloudWatchLogsSink getTestCloudWatchSink() {
return new CloudWatchLogsSink(mockPluginSetting, mockPluginMetrics, mockCloudWatchLogsSinkConfig,
mockCredentialSupplier);
}

Collection<Record<Event>> getMockedRecords() {
Collection<Record<Event>> testCollection = new ArrayList<>();
Record<Event> mockedEvent = new Record<>(JacksonEvent.fromMessage(""));
Record<Event> spyEvent = spy(mockedEvent);

testCollection.add(spyEvent);

return testCollection;
}

@Test
void WHEN_sink_is_initialized_THEN_sink_is_ready_returns_true() {
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) {
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class),
any(AwsCredentialsSupplier.class)))
.thenReturn(mockClient);

CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink();
testCloudWatchSink.doInitialize();
assertTrue(testCloudWatchSink.isReady());
}
}

@Test
void WHEN_given_sample_empty_records_THEN_records_are_processed() {
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) {
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class),
any(AwsCredentialsSupplier.class)))
.thenReturn(mockClient);

CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink();
testCloudWatchSink.doInitialize();
Collection<Record<Event>> spyEvents = getMockedRecords();

testCloudWatchSink.doOutput(spyEvents);

for (Record<Event> spyEvent : spyEvents) {
verify(spyEvent, atLeast(1)).getData();
}
}
}

@Test
void WHEN_given_sample_empty_records_THEN_records_are_not_processed() {
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) {
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class),
any(AwsCredentialsSupplier.class)))
.thenReturn(mockClient);

CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink();
testCloudWatchSink.doInitialize();
Collection<Record<Event>> spyEvents = spy(ArrayList.class);

assertTrue(spyEvents.isEmpty());

testCloudWatchSink.doOutput(spyEvents);
verify(spyEvents, times(2)).isEmpty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer;

import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down
Loading

0 comments on commit 3a19a4e

Please sign in to comment.