Skip to content

Commit

Permalink
GitHub-Issue#2778: Refactored Config to include Duration and ByteCoun…
Browse files Browse the repository at this point in the history
…t types (#3099)

* Converted data types in the configuration to Data-Prepper types

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>

* Added Duration to backOffTime

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>

* Attempting to fix unused imports

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>

---------

Signed-off-by: Marcos Gonzalez Mayedo <[email protected]>
Co-authored-by: Marcos Gonzalez Mayedo <[email protected]>
  • Loading branch information
MaGonzalMayedo and Marcos Gonzalez Mayedo authored Aug 2, 2023
1 parent 7c7062e commit ce2d4dd
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 79 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'org.projectlombok:lombok:1.18.26'
implementation 'org.hibernate.validator:hibernate-validator:8.0.0.Final'
testImplementation project(path: ':data-prepper-test-common')
testImplementation testLibs.mockito.inline
testImplementation 'org.junit.jupiter:junit-jupiter'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
CloudWatchLogsMetrics cloudWatchLogsMetrics = new CloudWatchLogsMetrics(pluginMetrics);
CloudWatchLogsLimits cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(),
thresholdConfig.getMaxEventSizeBytes(),
thresholdConfig.getMaxRequestSize(),thresholdConfig.getLogSendInterval());
thresholdConfig.getMaxRequestSizeBytes(),thresholdConfig.getLogSendInterval());

CloudWatchLogsClient cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;
import org.opensearch.dataprepper.model.types.ByteCount;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

/**
* The threshold config holds the different configurations for
Expand All @@ -15,58 +21,58 @@
*/
public class ThresholdConfig {
public static final int DEFAULT_BATCH_SIZE = 25;
public static final int DEFAULT_EVENT_SIZE = 256;
public static final int DEFAULT_SIZE_OF_REQUEST = 1048576;
public static final String DEFAULT_EVENT_SIZE = "256kb";
public static final String DEFAULT_SIZE_OF_REQUEST = "1mb";
public static final int DEFAULT_RETRY_COUNT = 5;
public static final int DEFAULT_LOG_SEND_INTERVAL_TIME = 60;
public static final int DEFAULT_BACKOFF_TIME = 500;
public static final int BYTE_TO_KB_FACTOR = 1024;
public static final long DEFAULT_LOG_SEND_INTERVAL_TIME = 60;
public static final long DEFAULT_BACKOFF_TIME = 500;

@JsonProperty("batch_size")
@Size(min = 1, max = 10000, message = "batch_size amount should be between 1 to 10000")
private int batchSize = DEFAULT_BATCH_SIZE;

@JsonProperty("max_event_size")
@Size(min = 1, max = 256, message = "max_event_size amount should be between 1 to 256 kilobytes")
private int maxEventSize = DEFAULT_EVENT_SIZE;
private String maxEventSize = DEFAULT_EVENT_SIZE;

@JsonProperty("max_request_size")
@Size(min = 1, max = 1048576, message = "max_batch_request_size amount should be between 1 and 1048576 bytes")
private int maxRequestSize = DEFAULT_SIZE_OF_REQUEST;
private String maxRequestSize = DEFAULT_SIZE_OF_REQUEST;

@JsonProperty("retry_count")
@Size(min = 1, max = 15, message = "retry_count amount should be between 1 and 15")
private int retryCount = DEFAULT_RETRY_COUNT;

@JsonProperty("log_send_interval")
@Size(min = 5, max = 300, message = "log_send_interval amount should be between 5 and 300 seconds")
private int logSendInterval = DEFAULT_LOG_SEND_INTERVAL_TIME;
@DurationMin(seconds = 60)
@DurationMax(seconds = 3600)
private Duration logSendInterval = Duration.ofSeconds(DEFAULT_LOG_SEND_INTERVAL_TIME);

@JsonProperty("back_off_time")
@Size(min = 500, max = 1000, message = "back_off_time amount should be between 500 and 1000 milliseconds")
private int backOffTime = DEFAULT_BACKOFF_TIME;
@DurationMin(millis = 500)
@DurationMax(millis = 1000)
private Duration backOffTime = Duration.ofMillis(DEFAULT_BACKOFF_TIME);

public int getBatchSize() {
return batchSize;
}

public int getMaxEventSizeBytes() {
return maxEventSize * BYTE_TO_KB_FACTOR;
public long getMaxEventSizeBytes() {
return ByteCount.parse(maxEventSize).getBytes();
}

public int getMaxRequestSize() {
return maxRequestSize;
public long getMaxRequestSizeBytes() {
return ByteCount.parse(maxRequestSize).getBytes();
}

public int getRetryCount() {
return retryCount;
}

public int getLogSendInterval() {
return logSendInterval;
public long getLogSendInterval() {
return logSendInterval.getSeconds();
}

public int getBackOffTime() {
return backOffTime;
public long getBackOffTime() {
return (backOffTime.get(ChronoUnit.NANOS) / 1000000) + (backOffTime.getSeconds() * 1000);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@

package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils;
/**
* ThresholdCheck receives parameters for which to reference the
* CloudWatchLogsLimits receives parameters for which to reference the
* limits of a buffer and CloudWatchLogsClient before making a
* PutLogEvent request to AWS.
*/
public class CloudWatchLogsLimits {
// Size of overhead for each log event message. See https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
public static final int APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE = 26;
private final int maxBatchSize;
private final int maxEventSizeBytes;
private final int maxRequestSizeBytes;
private final long maxEventSizeBytes;
private final long maxRequestSizeBytes;
private final long logSendInterval;

public CloudWatchLogsLimits(final int maxBatchSize, final int maxEventSizeBytes, final int maxRequestSizeBytes, final int logSendInterval) {
public CloudWatchLogsLimits(final int maxBatchSize, final long maxEventSizeBytes, final long maxRequestSizeBytes, final long logSendInterval) {
this.maxBatchSize = maxBatchSize;
this.maxEventSizeBytes = maxEventSizeBytes;
this.maxRequestSizeBytes = maxRequestSizeBytes;
Expand All @@ -31,8 +31,8 @@ public CloudWatchLogsLimits(final int maxBatchSize, final int maxEventSizeBytes,
* @param batchSize size of batch in events.
* @return boolean true if we exceed the threshold events or false otherwise.
*/
public boolean isGreaterThanLimitReached(final long currentTime, final int currentRequestSize, final int batchSize) {
int bufferSizeWithOverhead = (currentRequestSize + ((batchSize) * APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE));
public boolean isGreaterThanLimitReached(final long currentTime, final long currentRequestSize, final int batchSize) {
long bufferSizeWithOverhead = (currentRequestSize + ((long) (batchSize) * APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE));
return (isGreaterThanBatchSize(batchSize) || isGreaterEqualToLogSendInterval(currentTime)
|| isGreaterThanMaxRequestSize(bufferSizeWithOverhead));
}
Expand All @@ -43,8 +43,8 @@ public boolean isGreaterThanLimitReached(final long currentTime, final int curre
* @param batchSize size of batch in events.
* @return boolean true if we equal the threshold events or false otherwise.
*/
public boolean isEqualToLimitReached(final int currentRequestSize, final int batchSize) {
int bufferSizeWithOverhead = (currentRequestSize + ((batchSize) * APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE));
public boolean isEqualToLimitReached(final long currentRequestSize, final int batchSize) {
long bufferSizeWithOverhead = (currentRequestSize + ((long) (batchSize) * APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE));
return (isEqualBatchSize(batchSize) || isEqualMaxRequestSize(bufferSizeWithOverhead));
}

Expand All @@ -63,7 +63,7 @@ private boolean isGreaterEqualToLogSendInterval(final long currentTimeSeconds) {
* @param eventSize int denoting size of event.
* @return boolean true if greater than MaxEventSize, false otherwise.
*/
public boolean isGreaterThanMaxEventSize(final int eventSize) {
public boolean isGreaterThanMaxEventSize(final long eventSize) {
return (eventSize + APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE) > maxEventSizeBytes;
}

Expand All @@ -72,7 +72,7 @@ public boolean isGreaterThanMaxEventSize(final int eventSize) {
* @param currentRequestSize int denoting size of request(Sum of PutLogEvent messages).
* @return boolean true if greater than Max request size, smaller otherwise.
*/
private boolean isGreaterThanMaxRequestSize(final int currentRequestSize) {
private boolean isGreaterThanMaxRequestSize(final long currentRequestSize) {
return currentRequestSize > maxRequestSizeBytes;
}

Expand All @@ -91,7 +91,7 @@ private boolean isGreaterThanBatchSize(final int batchSize) {
* @param currentRequestSize int denoting size of request(Sum of PutLogEvent messages).
* @return boolean true if equal Max request size, smaller otherwise.
*/
private boolean isEqualMaxRequestSize(final int currentRequestSize) {
private boolean isEqualMaxRequestSize(final long currentRequestSize) {
return currentRequestSize == maxRequestSizeBytes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void setUp() {

thresholdConfig = new ThresholdConfig();
cloudWatchLogsLimits = new CloudWatchLogsLimits(thresholdConfig.getBatchSize(), thresholdConfig.getMaxEventSizeBytes(),
thresholdConfig.getMaxRequestSize(), thresholdConfig.getLogSendInterval());
thresholdConfig.getMaxRequestSizeBytes(), thresholdConfig.getLogSendInterval());

mockClient = mock(CloudWatchLogsClient.class);
mockMetrics = mock(CloudWatchLogsMetrics.class);
Expand Down Expand Up @@ -85,7 +85,7 @@ Collection<Record<Event>> getSampleRecordsCollection() {
Collection<Record<Event>> getSampleRecordsOfLargerSize() {
final ArrayList<Record<Event>> returnCollection = new ArrayList<>();
for (int i = 0; i < thresholdConfig.getBatchSize() * 2; i++) {
JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("a".repeat((ThresholdConfig.DEFAULT_SIZE_OF_REQUEST/24)));
JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("a".repeat((int) (thresholdConfig.getMaxRequestSizeBytes()/24)));
final EventHandle mockEventHandle = mock(EventHandle.class);
mockJacksonEvent.setEventHandle(mockEventHandle);
returnCollection.add(new Record<>(mockJacksonEvent));
Expand All @@ -97,7 +97,7 @@ Collection<Record<Event>> getSampleRecordsOfLargerSize() {
Collection<Record<Event>> getSampleRecordsOfLimitSize() {
final ArrayList<Record<Event>> returnCollection = new ArrayList<>();
for (int i = 0; i < thresholdConfig.getBatchSize(); i++) {
JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("testMessage".repeat(ThresholdConfig.DEFAULT_EVENT_SIZE * ThresholdConfig.BYTE_TO_KB_FACTOR));
JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("testMessage".repeat((int) thresholdConfig.getMaxEventSizeBytes()));
final EventHandle mockEventHandle = mock(EventHandle.class);
mockJacksonEvent.setEventHandle(mockEventHandle);
returnCollection.add(new Record<>(mockJacksonEvent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;

import java.time.Duration;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -31,8 +34,8 @@ void GIVEN_new_threshold_config_SHOULD_return_valid_default_values() {
assertThat(thresholdConfig.getBackOffTime(), equalTo(ThresholdConfig.DEFAULT_BACKOFF_TIME));
assertThat(thresholdConfig.getRetryCount(), equalTo(ThresholdConfig.DEFAULT_RETRY_COUNT));
assertThat(thresholdConfig.getBatchSize(), equalTo(ThresholdConfig.DEFAULT_BATCH_SIZE));
assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ThresholdConfig.DEFAULT_EVENT_SIZE * ThresholdConfig.BYTE_TO_KB_FACTOR));
assertThat(thresholdConfig.getMaxRequestSize(), equalTo(ThresholdConfig.DEFAULT_SIZE_OF_REQUEST));
assertThat(thresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_EVENT_SIZE).getBytes()));
assertThat(thresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(ThresholdConfig.DEFAULT_SIZE_OF_REQUEST).getBytes()));
assertThat(thresholdConfig.getLogSendInterval(), equalTo(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME));
}

Expand All @@ -45,19 +48,19 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_batch_size(final in
}

@ParameterizedTest
@ValueSource(ints = {1, 10, 256})
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_event_size(final int max_event_size) {
final Map<String, Integer> jsonMap = Map.of("max_event_size", max_event_size);
final ThresholdConfig thresholdConfigTest = objectMapper.convertValue(jsonMap, ThresholdConfig.class);
assertThat(thresholdConfigTest.getMaxEventSizeBytes(), equalTo(max_event_size * ThresholdConfig.BYTE_TO_KB_FACTOR));
@ValueSource(strings = {"1kb", "10kb", "256kb"})
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_event_size(final String max_event_size) throws NoSuchFieldException, IllegalAccessException {
ThresholdConfig sampleThresholdConfig = new ThresholdConfig();
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxEventSize", max_event_size);
assertThat(sampleThresholdConfig.getMaxEventSizeBytes(), equalTo(ByteCount.parse(max_event_size).getBytes()));
}

@ParameterizedTest
@ValueSource(ints = {1, 100, 1048576})
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_request_size(final int max_batch_request_size) {
final Map<String, Integer> jsonMap = Map.of("max_request_size", max_batch_request_size);
final ThresholdConfig thresholdConfigTest = objectMapper.convertValue(jsonMap, ThresholdConfig.class);
assertThat(thresholdConfigTest.getMaxRequestSize(), equalTo(max_batch_request_size));
@ValueSource(strings = {"1b", "100b", "1048576b"})
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_request_size(final String max_batch_request_size) throws NoSuchFieldException, IllegalAccessException {
ThresholdConfig sampleThresholdConfig = new ThresholdConfig();
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "maxRequestSize", max_batch_request_size);
assertThat(sampleThresholdConfig.getMaxRequestSizeBytes(), equalTo(ByteCount.parse(max_batch_request_size).getBytes()));
}

@ParameterizedTest
Expand All @@ -70,17 +73,17 @@ void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_retry_count(fin

@ParameterizedTest
@ValueSource(ints = {5, 10, 300})
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_log_send_interval(final int log_send_interval) {
final Map<String, Integer> jsonMap = Map.of("log_send_interval", log_send_interval);
final ThresholdConfig thresholdConfigTest = objectMapper.convertValue(jsonMap, ThresholdConfig.class);
assertThat(thresholdConfigTest.getLogSendInterval(), equalTo(log_send_interval));
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_max_log_send_interval(final int log_send_interval) throws NoSuchFieldException, IllegalAccessException {
ThresholdConfig sampleThresholdConfig = new ThresholdConfig();
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "logSendInterval", Duration.ofSeconds(log_send_interval));
assertThat(sampleThresholdConfig.getLogSendInterval(), equalTo(Duration.ofSeconds(log_send_interval).getSeconds())) ;
}

@ParameterizedTest
@ValueSource(ints = {0, 100, 5000})
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_back_off_time(final int back_off_time) {
final Map<String, Integer> jsonMap = Map.of("back_off_time", back_off_time);
final ThresholdConfig thresholdConfigTest = objectMapper.convertValue(jsonMap, ThresholdConfig.class);
assertThat(thresholdConfigTest.getBackOffTime(), equalTo(back_off_time));
@ValueSource(longs = {0, 500, 1000})
void GIVEN_deserialized_threshold_config_SHOULD_return_valid_back_off_time(final long back_off_time) throws NoSuchFieldException, IllegalAccessException {
ThresholdConfig sampleThresholdConfig = new ThresholdConfig();
ReflectivelySetField.setField(sampleThresholdConfig.getClass(), sampleThresholdConfig, "backOffTime", Duration.ofMillis(back_off_time));
assertThat(sampleThresholdConfig.getBackOffTime(), equalTo(back_off_time));
}
}
}
Loading

0 comments on commit ce2d4dd

Please sign in to comment.