Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GitHub-Issue#2778: Added CouldWatchLogsService, Tests and RetransmissionException #3023

Merged
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
f4dc9da
Elasticsearch client implementation with pit and no context search (#…
graytaylor0 Jun 21, 2023
4387b29
GitHub-Issue#2778: Refactoring config files for CloudWatchLogs Sink (#4)
MaGonzalMayedo Jun 21, 2023
f1c25bb
Added fixes from comments to code (including pathing and nomenclature…
Jun 22, 2023
9d640e5
Refactoring config (#5)
MaGonzalMayedo Jun 22, 2023
b100ee3
Fixed deleted AwsConfig file
Jun 22, 2023
35859b0
Removed the s3 dependency from build.gradle, replaced the AwsAuth.. w…
Jun 26, 2023
7040186
Added modifiable back_off_timer, added threshold test for back_off_ti…
Jun 27, 2023
6b13e21
Added fixes to gradle file, added tests to AwsConfig, and used Reflec…
Jun 28, 2023
3bb125a
Added default value test to ThresholdConfig and renamed getter for ma…
Jun 28, 2023
e5ee1e5
Removed unnecessary imports
Jun 28, 2023
e90b05a
Added cloudwatch-logs to settings.gradle
Jul 3, 2023
43a0a40
Merge branch 'main' into main
MaGonzalMayedo Jul 3, 2023
40cb280
Added a quick fix to the back_off_time range
Jul 4, 2023
1aac686
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo Jul 5, 2023
903ea26
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo Jul 7, 2023
ffe5cbe
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo Jul 12, 2023
b56a845
Added Buffer classes, ClientFactory similar to S3, and ThresholdCheck
Jul 5, 2023
b06ed0b
Removed unnecessary default method from ClientFactory
Jul 5, 2023
7f5a432
Added comments in Buffer Interface, change some default values to sui…
Jul 5, 2023
4576899
Removed unused imports
Jul 7, 2023
a539833
Changed the unused imports, made parameters final in the ThresholdCheck
Jul 7, 2023
c89ea17
Made changes to the tests and the method signatures in ThresholdCheck…
Jul 10, 2023
063e1d3
Removed unused methods/comments
Jul 10, 2023
1aad0b5
Added CloudWatchLogsService, CloudWatchLogsServiceTest and Retransmis…
Jul 12, 2023
cf1f8e1
Fixed retransmission logging fixed value
Jul 12, 2023
77f6d0f
Fixed unused imports
Jul 12, 2023
75d90fe
Fixed making ThresholdCheck public
Jul 12, 2023
5f2f511
Added fixes to ThresholdCheck and CloudWatchLogsService to decouple m…
Jul 12, 2023
fdc5b00
Fixed syntax start import in CloudWatchLogsServiceTest
Jul 12, 2023
0399694
Extracted LogPusher and SinkStopWatch classes for code cleanup. Addde…
Jul 14, 2023
3c02e1d
Changed method uses in CloudWatchLogsService and removed logging the …
Jul 14, 2023
c2a02ec
Added Multithreaded CloudWatchLogsDispatcher for handling various asy…
Jul 19, 2023
abec5e3
Added fixesto test and defaulted the parameters in the config to Clou…
Jul 20, 2023
9bbfedd
Added exponential backofftime
Jul 20, 2023
6e28adc
Fixed unused imports
Jul 20, 2023
90418aa
Fixed up deepcopy of arraylist for service workers in CloudWatchLogsS…
Jul 20, 2023
5971190
Added CloudWatchLogsDispatcher builder pattern, fixed tests for Servi…
Jul 24, 2023
53086c4
Removed unused imports
Jul 25, 2023
26f18a1
Added resetBuffer method, removed unnecessary RetransmissionException…
Jul 25, 2023
a5b8be7
Started making changes to the tests to implement the new class struct…
Jul 26, 2023
070beef
Refactored the CloudWatchLogsDispatcher into two classes with the add…
Jul 27, 2023
6130b08
Fixed issues with locking in try block and added final multithreaded …
Jul 27, 2023
69320ec
Added CloudWatchLogsMetricsTest, changed upper back off time bound an…
Jul 28, 2023
993cbd0
Added changes to javadoc
Jul 28, 2023
2b18115
Update data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensea…
MaGonzalMayedo Jul 28, 2023
eee6197
Fixed comment on CloudWatchLogsDispatcher
Jul 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ dependencies {
implementation 'software.amazon.awssdk:cloudwatch'
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'org.projectlombok:lombok:1.18.26'
testImplementation project(path: ':data-prepper-test-common')
testImplementation testLibs.mockito.inline
testImplementation 'org.junit.jupiter:junit-jupiter'
compileOnly 'org.projectlombok:lombok:1.18.24'
annotationProcessor 'org.projectlombok:lombok:1.18.24'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;
import java.util.List;

/**
* Buffer that handles the temporary storage of
Expand All @@ -14,6 +14,13 @@
* 2. Transforms to Byte type.
* 3. Returns a Byte type.
*/

/*
TODO:
Need to add PriorityQueue for extracting timestamp, this will need the timestamp and the actual string message itself.
Can refactor the buffer to contain
*/

public interface Buffer {
/**
* Size of buffer in events.
Expand All @@ -31,7 +38,9 @@ public interface Buffer {

byte[] popEvent();

ArrayList<byte[]> getBufferedData();
List<byte[]> getBufferedData();

void clearBuffer();

void resetBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class InMemoryBuffer implements Buffer {
private final ArrayList<byte[]> eventsBuffered;
private List<byte[]> eventsBuffered;
private int bufferSize = 0;

InMemoryBuffer() {
Expand All @@ -33,18 +35,27 @@ public void writeEvent(final byte[] event) {

@Override
public byte[] popEvent() {
if (eventsBuffered.isEmpty()) {
return new byte[0];
}
bufferSize -= eventsBuffered.get(0).length;
return eventsBuffered.remove(0);
}

@Override
public ArrayList<byte[]> getBufferedData() {
return eventsBuffered;
public List<byte[]> getBufferedData() {
return Collections.unmodifiableList(eventsBuffered);
}

@Override
public void clearBuffer() {
bufferSize = 0;
eventsBuffered.clear();
}

@Override
public void resetBuffer() {
bufferSize = 0;
eventsBuffered = new ArrayList<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,21 @@
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

/**
* CwlClientFactory is in charge of reading in
* aws config parameters to return a working
* client for interfacing with
* CloudWatchLogs services.
* CwlClientFactory is in charge of reading in aws config parameters to return a working
* client for interfacing with CloudWatchLogs services.
*/
public final class CloudWatchLogsClientFactory {
private CloudWatchLogsClientFactory() {
}

/**
* Generates a CloudWatchLogs Client based on STS role ARN system credentials.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the createOverrideConfiguration() method you use nested builder. It is better practice to use lambdas instead (easier to read).

return ClientOverrideConfiguration.builder()
            .retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS))
            .build();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I have added the lambda to increase code readability.

* @param awsConfig - AwsConfig specifying region, roles, and header overrides.
* @param awsCredentialsSupplier - AwsCredentialsSupplier Interface for which to create CredentialsProvider for Client config.
* @return CloudWatchLogsClient - used to interact with CloudWatch Logs services.
* @param awsConfig AwsConfig specifying region, roles, and header overrides.
* @param awsCredentialsSupplier AwsCredentialsSupplier Interface for which to create CredentialsProvider for Client config.
* @return CloudWatchLogsClient used to interact with CloudWatch Logs services.
*/
public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialOptions(awsConfig);
Expand All @@ -38,10 +37,8 @@ public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, fi
}

private static ClientOverrideConfiguration createOverrideConfiguration() {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS).build();

return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;

import lombok.Builder;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;

@Builder
public class CloudWatchLogsDispatcher {
private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 2000;
private static final float EXP_TIME_SCALE = 1.25F;
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsDispatcher.class);
private CloudWatchLogsClient cloudWatchLogsClient;
private CloudWatchLogsMetrics cloudWatchLogsMetrics;
private Executor executor;
private String logGroup;
private String logStream;
private int retryCount;
private long backOffTimeBase;
public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final Executor executor,
final String logGroup, final String logStream,
final int retryCount, final long backOffTimeBase) {
this.cloudWatchLogsClient = cloudWatchLogsClient;
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
this.logGroup = logGroup;
this.logStream = logStream;
this.retryCount = retryCount;
this.backOffTimeBase = backOffTimeBase;

this.executor = executor;
}

/**
* Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents.
* @param eventMessageBytes Collection of byte arrays holding event messages.
* @return List of InputLogEvents holding the wrapped event messages.
*/
public List<InputLogEvent> prepareInputLogEvents(final Collection<byte[]> eventMessageBytes) {
List<InputLogEvent> logEventList = new ArrayList<>();

/**
* Current implementation, timestamp is generated by system time during transmission.
MaGonzalMayedo marked this conversation as resolved.
Show resolved Hide resolved
* To properly extract timestamp we need to order the InputLogEvents. Can be done by
* refactoring buffer class with timestamp param, or adding a sorting algorithm in between
* making the PLE object (in prepareInputLogEvents).
*/

for (byte[] data : eventMessageBytes) {
InputLogEvent tempLogEvent = InputLogEvent.builder()
.message(new String(data, StandardCharsets.UTF_8))
.timestamp(System.currentTimeMillis())
.build();
logEventList.add(tempLogEvent);
}

return logEventList;
}

public void dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<EventHandle> eventHandles) {
PutLogEventsRequest putLogEventsRequest = PutLogEventsRequest.builder()
.logEvents(inputLogEvents)
.logGroupName(logGroup)
.logStreamName(logStream)
.build();

executor.execute(Uploader.builder()
.cloudWatchLogsClient(cloudWatchLogsClient)
.cloudWatchLogsMetrics(cloudWatchLogsMetrics)
.putLogEventsRequest(putLogEventsRequest)
.eventHandles(eventHandles)
.backOffTimeBase(backOffTimeBase)
.retryCount(retryCount)
.build());
}

@Builder
protected static class Uploader implements Runnable {
private final CloudWatchLogsClient cloudWatchLogsClient;
private final CloudWatchLogsMetrics cloudWatchLogsMetrics;
private final PutLogEventsRequest putLogEventsRequest;
private final Collection<EventHandle> eventHandles;
private final int retryCount;
private final long backOffTimeBase;

@Override
public void run() {
upload();
}

public void upload() {
boolean failedToTransmit = true;
int failCount = 0;

try {
while (failedToTransmit && (failCount < retryCount)) {
try {
cloudWatchLogsClient.putLogEvents(putLogEventsRequest);

cloudWatchLogsMetrics.increaseRequestSuccessCounter(1);
failedToTransmit = false;

} catch (CloudWatchLogsException | SdkClientException e) {
LOG.error("Failed to push logs with error: {}", e.getMessage());
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
Thread.sleep(calculateBackOffTime(backOffTimeBase, failCount));
failCount++;
}
}
} catch (InterruptedException e) {
LOG.warn("Got interrupted while waiting!");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better log needed here. This doesn't tell the customer anything. Interrupted by what? stacktrace? while waiting for what? in what class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I have added a more appropriate message alongside the actual thrown error message.

//TODO: Push to DLQ.
Thread.currentThread().interrupt();
}


if (failedToTransmit) {
cloudWatchLogsMetrics.increaseLogEventFailCounter(eventHandles.size());
releaseEventHandles(false, eventHandles);
} else {
cloudWatchLogsMetrics.increaseLogEventSuccessCounter(eventHandles.size());
releaseEventHandles(true, eventHandles);
}
}

private long calculateBackOffTime(final long backOffTimeBase, final int failCounter) {
long scale = (long)Math.pow(EXP_TIME_SCALE, failCounter);

if (scale >= UPPER_RETRY_TIME_BOUND_MILLISECONDS) {
return UPPER_RETRY_TIME_BOUND_MILLISECONDS;
}

return scale * backOffTimeBase;
}

private void releaseEventHandles(final boolean result, final Collection<EventHandle> eventHandles) {
if (eventHandles.isEmpty()) {
return;
}

for (EventHandle eventHandle : eventHandles) {
eventHandle.release(result);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;

/**
* Class is meant to abstract the metric book-keeping of
* CloudWatchLogs metrics so that multiple instances
* may refer to it.
*/
public class CloudWatchLogsMetrics {
protected static final String CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED = "cloudWatchLogsRequestsSucceeded";
protected static final String CLOUDWATCH_LOGS_EVENTS_SUCCEEDED = "cloudWatchLogsEventsSucceeded";
protected static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed";
protected static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed";
private final Counter logEventSuccessCounter;
private final Counter logEventFailCounter;
private final Counter requestSuccessCount;
private final Counter requestFailCount;

public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) {
this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED);
this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED);
this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED);
this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED);
}

public void increaseLogEventSuccessCounter(int value) {
logEventSuccessCounter.increment(value);
}

public void increaseRequestSuccessCounter(int value) {
requestSuccessCount.increment(value);
}

public void increaseLogEventFailCounter(int value) {
logEventFailCounter.increment(value);
}

public void increaseRequestFailCounter(int value) {
requestFailCount.increment(value);
}
}
Loading
Loading