Skip to content

Commit

Permalink
Merge branch 'main' into sns-sink-plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Uday Chintala <[email protected]>
  • Loading branch information
udaych20 authored Aug 4, 2023
2 parents 3a19a4e + fbfb82e commit 7088650
Show file tree
Hide file tree
Showing 72 changed files with 2,444 additions and 697 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run basic grok end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:log:${{ matrix.test }}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run raw-span latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:peerforwarder:localAggregateEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run raw-span latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:peerforwarder:${{ matrix.test }}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run raw-span latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanLatestReleaseCompatibilityEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run raw-span end-to-end tests with Gradle
run: ./gradlew -PopenTelemetryVersion=${{ matrix.otelVersion }} -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run raw-span latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanPeerForwarderEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run service-map end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:serviceMapPeerForwarderEndToEndTest
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Build with Gradle
run: ./gradlew build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run Open Distro docker
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run OpenSearch docker
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/performance-test-compile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Build performance tests with Gradle
run: ./gradlew :performance-test:compileGatlingJava
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Get Version
run: grep '^version=' gradle.properties >> $GITHUB_ENV
Expand Down Expand Up @@ -62,7 +62,7 @@ jobs:
timeout-minutes: 30

steps:
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Get Version
run: grep '^version=' gradle.properties >> $GITHUB_ENV
Expand All @@ -88,7 +88,7 @@ jobs:
timeout-minutes: 8

steps:
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Get Version
run: grep '^version=' gradle.properties >> $GITHUB_ENV
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/staging-resources-cdk-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
with:
node-version: '16'

- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2

- name: Install NPM Dependencies
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/third-party-generate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2

- name: Generate Third Party Report
Expand Down
83 changes: 83 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# CloudWatch Logs Sink

This Data Prepper sink allows the sending of log data to CloudWatch Logs via a CloudWatchLogsClient.

## Usages

The CloudWatch Logs sink should be configured as part of Data Prepper pipeline yaml file.

## Configuration Options

```
pipeline:
...
sink:
- cloudwatch_logs:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
sts_header_overrides:
custom_header: ...
custom_header2: ...
...
sts_external_id: 123ABC
log_group: sample_group
log_stream: sample_stream
buffer_type: in_memory
threshold:
batch_size: 10000
max_event_size: 256kb
max_request_size: 1mb
retry_count: 5
back_off_time: 500ms
```

## AWS Configuration

- `region` (Optional) : A string representing the AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).

- `sts_role_arn` (Optional) : A string representing AWS STS role to assume for requests to CloudWatchLogs. which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).

- `sts_header_overrides` (Optional) : A string map representing different custom headers that can be added.

- `sts_external_id` (Optional) : A string representing the external ID to attach to AssumeRole requests. Referenced here: [how to use external ID](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html)

## Threshold Configuration

- `batch_size` (Optional) : An integer value that indicates how many events we hold until we make a call to CloudWatch Logs. Defaults to 25. (Min = 1, Max = 10000)

- `max_event_size` (Optional) : A string representing the max size in bytes of the allowed events. Defaults to "256kb". (Min = "1b", Max = "256kb")

- `max_request_size` (Optional) : A string representing the count or size of bytes we hold until we make a call to CloudWatch Logs. Default is "1mb". (Min = "1b", Max = "1mb")

- `retry_count` (Optional) : An integer value that indicates the number of retries we make when encountering errors sending logs to CloudWatch Logs. Defaults to 5. (Min = 1, Max = 15)

- `log_send_interval` (Optional) : A string representing the amount of time in seconds between making requests. Defaults to "60s". (Min = "5s", Max = "300s")

- `back_off_time` (Optional) : A string representing the amount of time in milliseconds between errored transmission re-attempts. Defaults to "500ms". (Min = "500ms", Max = "1000ms")

## Buffer Type Configuration

- `buffer_type` (Optional) : A string representing the type of buffer to use to hold onto events. Currently only supports `in_memory`.

## Plugin Functionality
The cloudwatch_logs sink plugin uses credentials to establish a client to CloudWatch Logs. It currently uses the current system timestamp for publishing and implements an exponential back off strategy
for retransmission.

The cloudwatch_logs sink plugin also adds an overhead of 26 bytes added to each event message. This is done by the AWS SDK when formatting the API call to CloudWatch Logs. This must be considered when setting custom
threshold parameters.
## Metrics

### Counters

* `cloudWatchLogsEventsSucceeded` - The number of log events successfully published to CloudWatch Logs.
* `cloudWatchLogsEventsFailed` - The number of log events failed while publishing to CloudWatch Logs.
* `cloudWatchLogsRequestsSucceeded` - The number of log requests successfully made to CloudWatch Logs.
* `cloudWatchLogsRequestsFailed` - The number of log requests failed to reach CloudWatch Logs.

## Developer Guide

This plugin is compatible with Java 11. See below

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
1 change: 1 addition & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'org.projectlombok:lombok:1.18.26'
implementation 'org.hibernate.validator:hibernate-validator:8.0.0.Final'
testImplementation project(path: ':data-prepper-test-common')
testImplementation testLibs.mockito.inline
testImplementation 'org.junit.jupiter:junit-jupiter'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics);
CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(),
thresholdConfig.getMaxEventSizeBytes(),
thresholdConfig.getMaxRequestSize(),thresholdConfig.getLogSendInterval());
thresholdConfig.getMaxRequestSizeBytes(),thresholdConfig.getLogSendInterval());

CloudWatchLogsClient cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;
import org.opensearch.dataprepper.model.types.ByteCount;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

/**
* The threshold config holds the different configurations for
Expand All @@ -15,58 +21,58 @@
*/
public class ThresholdConfig {
public static final int DEFAULT_BATCH_SIZE = 25;
public static final int DEFAULT_EVENT_SIZE = 256;
public static final int DEFAULT_SIZE_OF_REQUEST = 1048576;
public static final String DEFAULT_EVENT_SIZE = "256kb";
public static final String DEFAULT_SIZE_OF_REQUEST = "1mb";
public static final int DEFAULT_RETRY_COUNT = 5;
public static final int DEFAULT_LOG_SEND_INTERVAL_TIME = 60;
public static final int DEFAULT_BACKOFF_TIME = 500;
public static final int BYTE_TO_KB_FACTOR = 1024;
public static final long DEFAULT_LOG_SEND_INTERVAL_TIME = 60;
public static final long DEFAULT_BACKOFF_TIME = 500;

@JsonProperty("batch_size")
@Size(min = 1, max = 10000, message = "batch_size amount should be between 1 to 10000")
private int batchSize = DEFAULT_BATCH_SIZE;

@JsonProperty("max_event_size")
@Size(min = 1, max = 256, message = "max_event_size amount should be between 1 to 256 kilobytes")
private int maxEventSize = DEFAULT_EVENT_SIZE;
private String maxEventSize = DEFAULT_EVENT_SIZE;

@JsonProperty("max_request_size")
@Size(min = 1, max = 1048576, message = "max_batch_request_size amount should be between 1 and 1048576 bytes")
private int maxRequestSize = DEFAULT_SIZE_OF_REQUEST;
private String maxRequestSize = DEFAULT_SIZE_OF_REQUEST;

@JsonProperty("retry_count")
@Size(min = 1, max = 15, message = "retry_count amount should be between 1 and 15")
private int retryCount = DEFAULT_RETRY_COUNT;

@JsonProperty("log_send_interval")
@Size(min = 5, max = 300, message = "log_send_interval amount should be between 5 and 300 seconds")
private int logSendInterval = DEFAULT_LOG_SEND_INTERVAL_TIME;
@DurationMin(seconds = 60)
@DurationMax(seconds = 3600)
private Duration logSendInterval = Duration.ofSeconds(DEFAULT_LOG_SEND_INTERVAL_TIME);

@JsonProperty("back_off_time")
@Size(min = 500, max = 1000, message = "back_off_time amount should be between 500 and 1000 milliseconds")
private int backOffTime = DEFAULT_BACKOFF_TIME;
@DurationMin(millis = 500)
@DurationMax(millis = 1000)
private Duration backOffTime = Duration.ofMillis(DEFAULT_BACKOFF_TIME);

public int getBatchSize() {
return batchSize;
}

public int getMaxEventSizeBytes() {
return maxEventSize * BYTE_TO_KB_FACTOR;
public long getMaxEventSizeBytes() {
return ByteCount.parse(maxEventSize).getBytes();
}

public int getMaxRequestSize() {
return maxRequestSize;
public long getMaxRequestSizeBytes() {
return ByteCount.parse(maxRequestSize).getBytes();
}

public int getRetryCount() {
return retryCount;
}

public int getLogSendInterval() {
return logSendInterval;
public long getLogSendInterval() {
return logSendInterval.getSeconds();
}

public int getBackOffTime() {
return backOffTime;
public long getBackOffTime() {
return (backOffTime.get(ChronoUnit.NANOS) / 1000000) + (backOffTime.getSeconds() * 1000);
}
}
}
Loading

0 comments on commit 7088650

Please sign in to comment.