-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GitHub-Issue#2778: Added CouldWatchLogsService, Tests and RetransmissionException #3023
Merged
dlvenable
merged 46 commits into
opensearch-project:main
from
MaGonzalMayedo:adding_cloud_watch_logs_service
Jul 31, 2023
Merged
Changes from all commits
Commits
Show all changes
46 commits
Select commit
Hold shift + click to select a range
f4dc9da
Elasticsearch client implementation with pit and no context search (#…
graytaylor0 4387b29
GitHub-Issue#2778: Refactoring config files for CloudWatchLogs Sink (#4)
MaGonzalMayedo f1c25bb
Added fixes from comments to code (including pathing and nomenclature…
9d640e5
Refactoring config (#5)
MaGonzalMayedo b100ee3
Fixed deleted AwsConfig file
35859b0
Removed the s3 dependency from build.gradle, replaced the AwsAuth.. w…
7040186
Added modifiable back_off_timer, added threshold test for back_off_ti…
6b13e21
Added fixes to gradle file, added tests to AwsConfig, and used Reflec…
3bb125a
Added default value test to ThresholdConfig and renamed getter for ma…
e5ee1e5
Removed unnecessary imports
e90b05a
Added cloudwatch-logs to settings.gradle
43a0a40
Merge branch 'main' into main
MaGonzalMayedo 40cb280
Added a quick fix to the back_off_time range
1aac686
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo 903ea26
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo ffe5cbe
Merge branch 'opensearch-project:main' into main
MaGonzalMayedo b56a845
Added Buffer classes, ClientFactory similar to S3, and ThresholdCheck
b06ed0b
Removed unnecessary default method from ClientFactory
7f5a432
Added comments in Buffer Interface, change some default values to sui…
4576899
Removed unused imports
a539833
Changed the unused imports, made parameters final in the ThresholdCheck
c89ea17
Made changes to the tests and the method signatures in ThresholdCheck…
063e1d3
Removed unused methods/comments
1aad0b5
Added CloudWatchLogsService, CloudWatchLogsServiceTest and Retransmis…
cf1f8e1
Fixed retransmission logging fixed value
77f6d0f
Fixed unused imports
75d90fe
Fixed making ThresholdCheck public
5f2f511
Added fixes to ThresholdCheck and CloudWatchLogsService to decouple m…
fdc5b00
Fixed syntax start import in CloudWatchLogsServiceTest
0399694
Extracted LogPusher and SinkStopWatch classes for code cleanup. Addde…
3c02e1d
Changed method uses in CloudWatchLogsService and removed logging the …
c2a02ec
Added Multithreaded CloudWatchLogsDispatcher for handling various asy…
abec5e3
Added fixesto test and defaulted the parameters in the config to Clou…
9bbfedd
Added exponential backofftime
6e28adc
Fixed unused imports
90418aa
Fixed up deepcopy of arraylist for service workers in CloudWatchLogsS…
5971190
Added CloudWatchLogsDispatcher builder pattern, fixed tests for Servi…
53086c4
Removed unused imports
26f18a1
Added resetBuffer method, removed unnecessary RetransmissionException…
a5b8be7
Started making changes to the tests to implement the new class struct…
070beef
Refactored the CloudWatchLogsDispatcher into two classes with the add…
6130b08
Fixed issues with locking in try block and added final multithreaded …
69320ec
Added CloudWatchLogsMetricsTest, changed upper back off time bound an…
993cbd0
Added changes to javadoc
2b18115
Update data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensea…
MaGonzalMayedo eee6197
Fixed comment on CloudWatchLogsDispatcher
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
163 changes: 163 additions & 0 deletions
163
...rc/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.client; | ||
|
||
import lombok.Builder; | ||
import org.opensearch.dataprepper.model.event.EventHandle; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.core.exception.SdkClientException; | ||
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; | ||
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; | ||
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; | ||
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.concurrent.Executor; | ||
|
||
@Builder | ||
public class CloudWatchLogsDispatcher { | ||
private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 2000; | ||
private static final float EXP_TIME_SCALE = 1.25F; | ||
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsDispatcher.class); | ||
private CloudWatchLogsClient cloudWatchLogsClient; | ||
private CloudWatchLogsMetrics cloudWatchLogsMetrics; | ||
private Executor executor; | ||
private String logGroup; | ||
private String logStream; | ||
private int retryCount; | ||
private long backOffTimeBase; | ||
public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient, | ||
final CloudWatchLogsMetrics cloudWatchLogsMetrics, | ||
final Executor executor, | ||
final String logGroup, final String logStream, | ||
final int retryCount, final long backOffTimeBase) { | ||
this.cloudWatchLogsClient = cloudWatchLogsClient; | ||
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics; | ||
this.logGroup = logGroup; | ||
this.logStream = logStream; | ||
this.retryCount = retryCount; | ||
this.backOffTimeBase = backOffTimeBase; | ||
|
||
this.executor = executor; | ||
} | ||
|
||
/** | ||
* Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents. | ||
* @param eventMessageBytes Collection of byte arrays holding event messages. | ||
* @return List of InputLogEvents holding the wrapped event messages. | ||
*/ | ||
public List<InputLogEvent> prepareInputLogEvents(final Collection<byte[]> eventMessageBytes) { | ||
List<InputLogEvent> logEventList = new ArrayList<>(); | ||
|
||
/** | ||
* In the current implementation, the timestamp is generated during transmission. | ||
* To properly extract timestamp we need to order the InputLogEvents. Can be done by | ||
* refactoring buffer class with timestamp param, or adding a sorting algorithm in between | ||
* making the PLE object (in prepareInputLogEvents). | ||
*/ | ||
|
||
for (byte[] data : eventMessageBytes) { | ||
InputLogEvent tempLogEvent = InputLogEvent.builder() | ||
.message(new String(data, StandardCharsets.UTF_8)) | ||
.timestamp(System.currentTimeMillis()) | ||
.build(); | ||
logEventList.add(tempLogEvent); | ||
} | ||
|
||
return logEventList; | ||
} | ||
|
||
public void dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<EventHandle> eventHandles) { | ||
PutLogEventsRequest putLogEventsRequest = PutLogEventsRequest.builder() | ||
.logEvents(inputLogEvents) | ||
.logGroupName(logGroup) | ||
.logStreamName(logStream) | ||
.build(); | ||
|
||
executor.execute(Uploader.builder() | ||
.cloudWatchLogsClient(cloudWatchLogsClient) | ||
.cloudWatchLogsMetrics(cloudWatchLogsMetrics) | ||
.putLogEventsRequest(putLogEventsRequest) | ||
.eventHandles(eventHandles) | ||
.backOffTimeBase(backOffTimeBase) | ||
.retryCount(retryCount) | ||
.build()); | ||
} | ||
|
||
@Builder | ||
protected static class Uploader implements Runnable { | ||
private final CloudWatchLogsClient cloudWatchLogsClient; | ||
private final CloudWatchLogsMetrics cloudWatchLogsMetrics; | ||
private final PutLogEventsRequest putLogEventsRequest; | ||
private final Collection<EventHandle> eventHandles; | ||
private final int retryCount; | ||
private final long backOffTimeBase; | ||
|
||
@Override | ||
public void run() { | ||
upload(); | ||
} | ||
|
||
public void upload() { | ||
boolean failedToTransmit = true; | ||
int failCount = 0; | ||
|
||
try { | ||
while (failedToTransmit && (failCount < retryCount)) { | ||
try { | ||
cloudWatchLogsClient.putLogEvents(putLogEventsRequest); | ||
|
||
cloudWatchLogsMetrics.increaseRequestSuccessCounter(1); | ||
failedToTransmit = false; | ||
|
||
} catch (CloudWatchLogsException | SdkClientException e) { | ||
LOG.error("Failed to push logs with error: {}", e.getMessage()); | ||
cloudWatchLogsMetrics.increaseRequestFailCounter(1); | ||
Thread.sleep(calculateBackOffTime(backOffTimeBase, failCount)); | ||
failCount++; | ||
} | ||
} | ||
} catch (InterruptedException e) { | ||
LOG.warn("Uploader Thread got interrupted during retransmission with exception: {}", e.getMessage()); | ||
//TODO: Push to DLQ. | ||
Thread.currentThread().interrupt(); | ||
} | ||
|
||
|
||
if (failedToTransmit) { | ||
cloudWatchLogsMetrics.increaseLogEventFailCounter(eventHandles.size()); | ||
releaseEventHandles(false, eventHandles); | ||
} else { | ||
cloudWatchLogsMetrics.increaseLogEventSuccessCounter(eventHandles.size()); | ||
releaseEventHandles(true, eventHandles); | ||
} | ||
} | ||
|
||
private long calculateBackOffTime(final long backOffTimeBase, final int failCounter) { | ||
long scale = (long)Math.pow(EXP_TIME_SCALE, failCounter); | ||
|
||
if (scale >= UPPER_RETRY_TIME_BOUND_MILLISECONDS) { | ||
return UPPER_RETRY_TIME_BOUND_MILLISECONDS; | ||
} | ||
|
||
return scale * backOffTimeBase; | ||
} | ||
|
||
private void releaseEventHandles(final boolean result, final Collection<EventHandle> eventHandles) { | ||
if (eventHandles.isEmpty()) { | ||
return; | ||
} | ||
|
||
for (EventHandle eventHandle : eventHandles) { | ||
eventHandle.release(result); | ||
} | ||
} | ||
} | ||
} |
48 changes: 48 additions & 0 deletions
48
...s/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsMetrics.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink.client; | ||
|
||
import io.micrometer.core.instrument.Counter; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
|
||
/** | ||
* Class is meant to abstract the metric book-keeping of | ||
* CloudWatchLogs metrics so that multiple instances | ||
* may refer to it. | ||
*/ | ||
public class CloudWatchLogsMetrics { | ||
protected static final String CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED = "cloudWatchLogsRequestsSucceeded"; | ||
protected static final String CLOUDWATCH_LOGS_EVENTS_SUCCEEDED = "cloudWatchLogsEventsSucceeded"; | ||
protected static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed"; | ||
protected static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed"; | ||
private final Counter logEventSuccessCounter; | ||
private final Counter logEventFailCounter; | ||
private final Counter requestSuccessCount; | ||
private final Counter requestFailCount; | ||
|
||
public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) { | ||
this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED); | ||
this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED); | ||
this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED); | ||
this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED); | ||
} | ||
|
||
public void increaseLogEventSuccessCounter(int value) { | ||
logEventSuccessCounter.increment(value); | ||
} | ||
|
||
public void increaseRequestSuccessCounter(int value) { | ||
requestSuccessCount.increment(value); | ||
} | ||
|
||
public void increaseLogEventFailCounter(int value) { | ||
logEventFailCounter.increment(value); | ||
} | ||
|
||
public void increaseRequestFailCounter(int value) { | ||
requestFailCount.increment(value); | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the
createOverrideConfiguration()
method you use nested builder. It is better practice to use lambdas instead (easier to read).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I have added the lambda to increase code readability.