Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GitHub-Issue#2778: Added CloudWatchLogsSink #3083

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f4dc9da
Elasticsearch client implementation with pit and no context search (#…
graytaylor0 Jun 21, 2023
4387b29
GitHub-Issue#2778: Refactoring config files for CloudWatchLogs Sink (#4)
MaGonzalMayedo Jun 21, 2023
f1c25bb
Added fixes from comments to code (including pathing and nomenclature…
Jun 22, 2023
9d640e5
Refactoring config (#5)
MaGonzalMayedo Jun 22, 2023
b100ee3
Fixed deleted AwsConfig file
Jun 22, 2023
35859b0
Removed the s3 dependency from build.gradle, replaced the AwsAuth.. w…
Jun 26, 2023
7040186
Added modifiable back_off_timer, added threshold test for back_off_ti…
Jun 27, 2023
6b13e21
Added fixes to gradle file, added tests to AwsConfig, and used Reflec…
Jun 28, 2023
3bb125a
Added default value test to ThresholdConfig and renamed getter for ma…
Jun 28, 2023
e5ee1e5
Removed unnecessary imports
Jun 28, 2023
e90b05a
Added cloudwatch-logs to settings.gradle
Jul 3, 2023
43a0a40
Merge branch 'main' into main
MaGonzalMayedo Jul 3, 2023
40cb280
Added a quick fix to the back_off_time range
Jul 4, 2023
1aac686
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo Jul 5, 2023
903ea26
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo Jul 7, 2023
ffe5cbe
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo Jul 12, 2023
b56a845
Added Buffer classes, ClientFactory similar to S3, and ThresholdCheck
Jul 5, 2023
b06ed0b
Removed unnecessary default method from ClientFactory
Jul 5, 2023
7f5a432
Added comments in Buffer Interface, change some default values to sui…
Jul 5, 2023
4576899
Removed unused imports
Jul 7, 2023
a539833
Changed the unused imports, made parameters final in the ThresholdCheck
Jul 7, 2023
c89ea17
Made changes to the tests and the method signatures in ThresholdCheck…
Jul 10, 2023
063e1d3
Removed unused methods/comments
Jul 10, 2023
1aad0b5
Added CloudWatchLogsService, CloudWatchLogsServiceTest and Retransmis…
Jul 12, 2023
cf1f8e1
Fixed retransmission logging fixed value
Jul 12, 2023
77f6d0f
Fixed unused imports
Jul 12, 2023
75d90fe
Fixed making ThresholdCheck public
Jul 12, 2023
5f2f511
Added fixes to ThresholdCheck and CloudWatchLogsService to decouple m…
Jul 12, 2023
fdc5b00
Fixed syntax start import in CloudWatchLogsServiceTest
Jul 12, 2023
0399694
Extracted LogPusher and SinkStopWatch classes for code cleanup. Addde…
Jul 14, 2023
3c02e1d
Changed method uses in CloudWatchLogsService and removed logging the …
Jul 14, 2023
c2a02ec
Added Multithreaded CloudWatchLogsDispatcher for handling various asy…
Jul 19, 2023
abec5e3
Added fixesto test and defaulted the parameters in the config to Clou…
Jul 20, 2023
9bbfedd
Added exponential backofftime
Jul 20, 2023
6e28adc
Fixed unused imports
Jul 20, 2023
90418aa
Fixed up deepcopy of arraylist for service workers in CloudWatchLogsS…
Jul 20, 2023
394bed7
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo Jul 25, 2023
5971190
Added CloudWatchLogsDispatcher builder pattern, fixed tests for Servi…
Jul 24, 2023
53086c4
Removed unused imports
Jul 25, 2023
26f18a1
Added resetBuffer method, removed unnecessary RetransmissionException…
Jul 25, 2023
a5b8be7
Started making changes to the tests to implement the new class struct…
Jul 26, 2023
070beef
Refactored the CloudWatchLogsDispatcher into two classes with the add…
Jul 27, 2023
6130b08
Fixed issues with locking in try block and added final multithreaded …
Jul 27, 2023
69320ec
Added CloudWatchLogsMetricsTest, changed upper back off time bound an…
Jul 28, 2023
993cbd0
Added changes to javadoc
Jul 28, 2023
377a0fe
Merge branch 'adding_cloud_watch_logs_service' into cloudwatchlogs_si…
Jul 28, 2023
2b18115
Update data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensea…
MaGonzalMayedo Jul 28, 2023
eee6197
Fixed comment on CloudWatchLogsDispatcher
Jul 28, 2023
e63e67d
Merge branch 'adding_cloud_watch_logs_service' into cloudwatchlogs_si…
Jul 28, 2023
de0621f
Added CloudWatchLogsSink and CloudWatchLogsSinkTest
Jul 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ dependencies {
implementation 'software.amazon.awssdk:cloudwatch'
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'org.projectlombok:lombok:1.18.26'
testImplementation project(path: ':data-prepper-test-common')
testImplementation testLibs.mockito.inline
testImplementation 'org.junit.jupiter:junit-jupiter'
compileOnly 'org.projectlombok:lombok:1.18.24'
annotationProcessor 'org.projectlombok:lombok:1.18.24'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.plugins.sink.buffer.Buffer;
import org.opensearch.dataprepper.plugins.sink.buffer.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.buffer.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsDispatcher;
import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsMetrics;
import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsService;
import org.opensearch.dataprepper.plugins.sink.client.CloudWatchLogsClientFactory;
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.config.CloudWatchLogsSinkConfig;
import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig;
import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@DataPrepperPlugin(name = "cloudwatchlogs-sink", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class)
public class CloudWatchLogsSink extends AbstractSink<Record<Event>> {
private final CloudWatchLogsService cloudWatchLogsService;
private boolean isInitialized;
@DataPrepperPluginConstructor
public CloudWatchLogsSink(final PluginSetting pluginSetting,
final PluginMetrics pluginMetrics,
final CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig,
final AwsCredentialsSupplier awsCredentialsSupplier) {
super(pluginSetting);

AwsConfig awsConfig = cloudWatchLogsSinkConfig.getAwsConfig();
ThresholdConfig thresholdConfig = cloudWatchLogsSinkConfig.getThresholdConfig();

CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics);
CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(),
thresholdConfig.getMaxEventSizeBytes(),
thresholdConfig.getMaxRequestSize(),thresholdConfig.getLogSendInterval());

CloudWatchLogsClient cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier);
BufferFactory bufferFactory = new InMemoryBufferFactory();

Executor executor = Executors.newCachedThreadPool();

CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder()
.cloudWatchLogsClient(cloudWatchLogsClient)
.cloudWatchLogsMetrics(cloudWatchLogsMetrics)
.logGroup(cloudWatchLogsSinkConfig.getLogGroup())
.logStream(cloudWatchLogsSinkConfig.getLogStream())
.backOffTimeBase(thresholdConfig.getBackOffTime())
.retryCount(thresholdConfig.getRetryCount())
.executor(executor)
.build();

Buffer buffer = bufferFactory.getBuffer();

cloudWatchLogsService = new CloudWatchLogsService(buffer, cloudWatchLogsLimits, cloudWatchLogsDispatcher);
}

@Override
public void doInitialize() {
isInitialized = true;
}

@Override
public void doOutput(Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
}

cloudWatchLogsService.processLogEvents(records);
}

@Override
public boolean isReady() {
return isInitialized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;
import java.util.List;

/**
* Buffer that handles the temporary storage of
Expand All @@ -14,6 +14,13 @@
* 2. Transforms to Byte type.
* 3. Returns a Byte type.
*/

/*
TODO:
Need to add PriorityQueue for extracting timestamp, this will need the timestamp and the actual string message itself.
Can refactor the buffer to contain
*/

public interface Buffer {
/**
* Size of buffer in events.
Expand All @@ -31,7 +38,9 @@ public interface Buffer {

byte[] popEvent();

ArrayList<byte[]> getBufferedData();
List<byte[]> getBufferedData();

void clearBuffer();

void resetBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class InMemoryBuffer implements Buffer {
private final ArrayList<byte[]> eventsBuffered;
private List<byte[]> eventsBuffered;
private int bufferSize = 0;

InMemoryBuffer() {
Expand All @@ -33,18 +35,27 @@ public void writeEvent(final byte[] event) {

@Override
public byte[] popEvent() {
if (eventsBuffered.isEmpty()) {
return new byte[0];
}
bufferSize -= eventsBuffered.get(0).length;
return eventsBuffered.remove(0);
}

@Override
public ArrayList<byte[]> getBufferedData() {
return eventsBuffered;
public List<byte[]> getBufferedData() {
return Collections.unmodifiableList(eventsBuffered);
}

@Override
public void clearBuffer() {
bufferSize = 0;
eventsBuffered.clear();
}

@Override
public void resetBuffer() {
bufferSize = 0;
eventsBuffered = new ArrayList<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,21 @@
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

/**
* CwlClientFactory is in charge of reading in
* aws config parameters to return a working
* client for interfacing with
* CloudWatchLogs services.
* CwlClientFactory is in charge of reading in aws config parameters to return a working
* client for interfacing with CloudWatchLogs services.
*/
public final class CloudWatchLogsClientFactory {
private CloudWatchLogsClientFactory() {
}

/**
* Generates a CloudWatchLogs Client based on STS role ARN system credentials.
* @param awsConfig - AwsConfig specifying region, roles, and header overrides.
* @param awsCredentialsSupplier - AwsCredentialsSupplier Interface for which to create CredentialsProvider for Client config.
* @return CloudWatchLogsClient - used to interact with CloudWatch Logs services.
* @param awsConfig AwsConfig specifying region, roles, and header overrides.
* @param awsCredentialsSupplier AwsCredentialsSupplier Interface for which to create CredentialsProvider for Client config.
* @return CloudWatchLogsClient used to interact with CloudWatch Logs services.
*/
public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialOptions(awsConfig);
Expand All @@ -38,10 +37,8 @@ public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, fi
}

private static ClientOverrideConfiguration createOverrideConfiguration() {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS).build();

return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;

import lombok.Builder;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;

@Builder
public class CloudWatchLogsDispatcher {
private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 2000;
private static final float EXP_TIME_SCALE = 1.25F;
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsDispatcher.class);
private CloudWatchLogsClient cloudWatchLogsClient;
private CloudWatchLogsMetrics cloudWatchLogsMetrics;
private Executor executor;
private String logGroup;
private String logStream;
private int retryCount;
private long backOffTimeBase;
public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final Executor executor,
final String logGroup, final String logStream,
final int retryCount, final long backOffTimeBase) {
this.cloudWatchLogsClient = cloudWatchLogsClient;
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
this.logGroup = logGroup;
this.logStream = logStream;
this.retryCount = retryCount;
this.backOffTimeBase = backOffTimeBase;

this.executor = executor;
}

/**
* Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents.
* @param eventMessageBytes Collection of byte arrays holding event messages.
* @return List of InputLogEvents holding the wrapped event messages.
*/
public List<InputLogEvent> prepareInputLogEvents(final Collection<byte[]> eventMessageBytes) {
List<InputLogEvent> logEventList = new ArrayList<>();

/**
* In the current implementation, the timestamp is generated during transmission.
* To properly extract timestamp we need to order the InputLogEvents. Can be done by
* refactoring buffer class with timestamp param, or adding a sorting algorithm in between
* making the PLE object (in prepareInputLogEvents).
*/

for (byte[] data : eventMessageBytes) {
InputLogEvent tempLogEvent = InputLogEvent.builder()
.message(new String(data, StandardCharsets.UTF_8))
.timestamp(System.currentTimeMillis())
.build();
logEventList.add(tempLogEvent);
}

return logEventList;
}

public void dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<EventHandle> eventHandles) {
PutLogEventsRequest putLogEventsRequest = PutLogEventsRequest.builder()
.logEvents(inputLogEvents)
.logGroupName(logGroup)
.logStreamName(logStream)
.build();

executor.execute(Uploader.builder()
.cloudWatchLogsClient(cloudWatchLogsClient)
.cloudWatchLogsMetrics(cloudWatchLogsMetrics)
.putLogEventsRequest(putLogEventsRequest)
.eventHandles(eventHandles)
.backOffTimeBase(backOffTimeBase)
.retryCount(retryCount)
.build());
}

@Builder
protected static class Uploader implements Runnable {
private final CloudWatchLogsClient cloudWatchLogsClient;
private final CloudWatchLogsMetrics cloudWatchLogsMetrics;
private final PutLogEventsRequest putLogEventsRequest;
private final Collection<EventHandle> eventHandles;
private final int retryCount;
private final long backOffTimeBase;

@Override
public void run() {
upload();
}

public void upload() {
boolean failedToTransmit = true;
int failCount = 0;

try {
while (failedToTransmit && (failCount < retryCount)) {
try {
cloudWatchLogsClient.putLogEvents(putLogEventsRequest);

cloudWatchLogsMetrics.increaseRequestSuccessCounter(1);
failedToTransmit = false;

} catch (CloudWatchLogsException | SdkClientException e) {
LOG.error("Failed to push logs with error: {}", e.getMessage());
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
Thread.sleep(calculateBackOffTime(backOffTimeBase, failCount));
failCount++;
}
}
} catch (InterruptedException e) {
LOG.warn("Uploader Thread got interrupted during retransmission with exception: {}", e.getMessage());
//TODO: Push to DLQ.
Thread.currentThread().interrupt();
}


if (failedToTransmit) {
cloudWatchLogsMetrics.increaseLogEventFailCounter(eventHandles.size());
releaseEventHandles(false, eventHandles);
} else {
cloudWatchLogsMetrics.increaseLogEventSuccessCounter(eventHandles.size());
releaseEventHandles(true, eventHandles);
}
}

private long calculateBackOffTime(final long backOffTimeBase, final int failCounter) {
long scale = (long)Math.pow(EXP_TIME_SCALE, failCounter);

if (scale >= UPPER_RETRY_TIME_BOUND_MILLISECONDS) {
return UPPER_RETRY_TIME_BOUND_MILLISECONDS;
}

return scale * backOffTimeBase;
}

private void releaseEventHandles(final boolean result, final Collection<EventHandle> eventHandles) {
if (eventHandles.isEmpty()) {
return;
}

for (EventHandle eventHandle : eventHandles) {
eventHandle.release(result);
}
}
}
}
Loading
Loading