Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into sns-sink-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
udaych20 authored Aug 1, 2023
2 parents b7b6e5e + 5e3765b commit 4849418
Show file tree
Hide file tree
Showing 38 changed files with 2,051 additions and 268 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ dependencies {
implementation 'software.amazon.awssdk:cloudwatch'
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'org.projectlombok:lombok:1.18.26'
testImplementation project(path: ':data-prepper-test-common')
testImplementation testLibs.mockito.inline
testImplementation 'org.junit.jupiter:junit-jupiter'
compileOnly 'org.projectlombok:lombok:1.18.24'
annotationProcessor 'org.projectlombok:lombok:1.18.24'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;
import java.util.List;

/**
* Buffer that handles the temporary storage of
Expand All @@ -14,6 +14,13 @@
* 2. Transforms to Byte type.
* 3. Returns a Byte type.
*/

/*
TODO:
Need to add PriorityQueue for extracting timestamp, this will need the timestamp and the actual string message itself.
Can refactor the buffer to contain
*/

public interface Buffer {
/**
* Size of buffer in events.
Expand All @@ -31,7 +38,9 @@ public interface Buffer {

byte[] popEvent();

ArrayList<byte[]> getBufferedData();
List<byte[]> getBufferedData();

void clearBuffer();

void resetBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class InMemoryBuffer implements Buffer {
private final ArrayList<byte[]> eventsBuffered;
private List<byte[]> eventsBuffered;
private int bufferSize = 0;

InMemoryBuffer() {
Expand All @@ -33,18 +35,27 @@ public void writeEvent(final byte[] event) {

@Override
public byte[] popEvent() {
if (eventsBuffered.isEmpty()) {
return new byte[0];
}
bufferSize -= eventsBuffered.get(0).length;
return eventsBuffered.remove(0);
}

@Override
public ArrayList<byte[]> getBufferedData() {
return eventsBuffered;
public List<byte[]> getBufferedData() {
return Collections.unmodifiableList(eventsBuffered);
}

@Override
public void clearBuffer() {
bufferSize = 0;
eventsBuffered.clear();
}

@Override
public void resetBuffer() {
bufferSize = 0;
eventsBuffered = new ArrayList<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,21 @@
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

/**
* CwlClientFactory is in charge of reading in
* aws config parameters to return a working
* client for interfacing with
* CloudWatchLogs services.
* CwlClientFactory is in charge of reading in aws config parameters to return a working
* client for interfacing with CloudWatchLogs services.
*/
public final class CloudWatchLogsClientFactory {
private CloudWatchLogsClientFactory() {
}

/**
* Generates a CloudWatchLogs Client based on STS role ARN system credentials.
* @param awsConfig - AwsConfig specifying region, roles, and header overrides.
* @param awsCredentialsSupplier - AwsCredentialsSupplier Interface for which to create CredentialsProvider for Client config.
* @return CloudWatchLogsClient - used to interact with CloudWatch Logs services.
* @param awsConfig AwsConfig specifying region, roles, and header overrides.
* @param awsCredentialsSupplier AwsCredentialsSupplier Interface for which to create CredentialsProvider for Client config.
* @return CloudWatchLogsClient used to interact with CloudWatch Logs services.
*/
public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialOptions(awsConfig);
Expand All @@ -38,10 +37,8 @@ public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, fi
}

private static ClientOverrideConfiguration createOverrideConfiguration() {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS).build();

return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;

import lombok.Builder;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;

@Builder
public class CloudWatchLogsDispatcher {
private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 2000;
private static final float EXP_TIME_SCALE = 1.25F;
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsDispatcher.class);
private CloudWatchLogsClient cloudWatchLogsClient;
private CloudWatchLogsMetrics cloudWatchLogsMetrics;
private Executor executor;
private String logGroup;
private String logStream;
private int retryCount;
private long backOffTimeBase;
public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final Executor executor,
final String logGroup, final String logStream,
final int retryCount, final long backOffTimeBase) {
this.cloudWatchLogsClient = cloudWatchLogsClient;
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
this.logGroup = logGroup;
this.logStream = logStream;
this.retryCount = retryCount;
this.backOffTimeBase = backOffTimeBase;

this.executor = executor;
}

/**
* Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents.
* @param eventMessageBytes Collection of byte arrays holding event messages.
* @return List of InputLogEvents holding the wrapped event messages.
*/
public List<InputLogEvent> prepareInputLogEvents(final Collection<byte[]> eventMessageBytes) {
List<InputLogEvent> logEventList = new ArrayList<>();

/**
* In the current implementation, the timestamp is generated during transmission.
* To properly extract timestamp we need to order the InputLogEvents. Can be done by
* refactoring buffer class with timestamp param, or adding a sorting algorithm in between
* making the PLE object (in prepareInputLogEvents).
*/

for (byte[] data : eventMessageBytes) {
InputLogEvent tempLogEvent = InputLogEvent.builder()
.message(new String(data, StandardCharsets.UTF_8))
.timestamp(System.currentTimeMillis())
.build();
logEventList.add(tempLogEvent);
}

return logEventList;
}

public void dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<EventHandle> eventHandles) {
PutLogEventsRequest putLogEventsRequest = PutLogEventsRequest.builder()
.logEvents(inputLogEvents)
.logGroupName(logGroup)
.logStreamName(logStream)
.build();

executor.execute(Uploader.builder()
.cloudWatchLogsClient(cloudWatchLogsClient)
.cloudWatchLogsMetrics(cloudWatchLogsMetrics)
.putLogEventsRequest(putLogEventsRequest)
.eventHandles(eventHandles)
.backOffTimeBase(backOffTimeBase)
.retryCount(retryCount)
.build());
}

@Builder
protected static class Uploader implements Runnable {
private final CloudWatchLogsClient cloudWatchLogsClient;
private final CloudWatchLogsMetrics cloudWatchLogsMetrics;
private final PutLogEventsRequest putLogEventsRequest;
private final Collection<EventHandle> eventHandles;
private final int retryCount;
private final long backOffTimeBase;

@Override
public void run() {
upload();
}

public void upload() {
boolean failedToTransmit = true;
int failCount = 0;

try {
while (failedToTransmit && (failCount < retryCount)) {
try {
cloudWatchLogsClient.putLogEvents(putLogEventsRequest);

cloudWatchLogsMetrics.increaseRequestSuccessCounter(1);
failedToTransmit = false;

} catch (CloudWatchLogsException | SdkClientException e) {
LOG.error("Failed to push logs with error: {}", e.getMessage());
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
Thread.sleep(calculateBackOffTime(backOffTimeBase, failCount));
failCount++;
}
}
} catch (InterruptedException e) {
LOG.warn("Uploader Thread got interrupted during retransmission with exception: {}", e.getMessage());
//TODO: Push to DLQ.
Thread.currentThread().interrupt();
}


if (failedToTransmit) {
cloudWatchLogsMetrics.increaseLogEventFailCounter(eventHandles.size());
releaseEventHandles(false, eventHandles);
} else {
cloudWatchLogsMetrics.increaseLogEventSuccessCounter(eventHandles.size());
releaseEventHandles(true, eventHandles);
}
}

private long calculateBackOffTime(final long backOffTimeBase, final int failCounter) {
long scale = (long)Math.pow(EXP_TIME_SCALE, failCounter);

if (scale >= UPPER_RETRY_TIME_BOUND_MILLISECONDS) {
return UPPER_RETRY_TIME_BOUND_MILLISECONDS;
}

return scale * backOffTimeBase;
}

private void releaseEventHandles(final boolean result, final Collection<EventHandle> eventHandles) {
if (eventHandles.isEmpty()) {
return;
}

for (EventHandle eventHandle : eventHandles) {
eventHandle.release(result);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;

/**
* Class is meant to abstract the metric book-keeping of
* CloudWatchLogs metrics so that multiple instances
* may refer to it.
*/
public class CloudWatchLogsMetrics {
protected static final String CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED = "cloudWatchLogsRequestsSucceeded";
protected static final String CLOUDWATCH_LOGS_EVENTS_SUCCEEDED = "cloudWatchLogsEventsSucceeded";
protected static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed";
protected static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed";
private final Counter logEventSuccessCounter;
private final Counter logEventFailCounter;
private final Counter requestSuccessCount;
private final Counter requestFailCount;

public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) {
this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED);
this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED);
this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED);
this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED);
}

public void increaseLogEventSuccessCounter(int value) {
logEventSuccessCounter.increment(value);
}

public void increaseRequestSuccessCounter(int value) {
requestSuccessCount.increment(value);
}

public void increaseLogEventFailCounter(int value) {
logEventFailCounter.increment(value);
}

public void increaseRequestFailCounter(int value) {
requestFailCount.increment(value);
}
}
Loading

0 comments on commit 4849418

Please sign in to comment.