From 27ba7154c69262e6961cf629101485c215463193 Mon Sep 17 00:00:00 2001 From: Marcos Gonzalez Mayedo Date: Thu, 27 Jul 2023 12:13:08 -0700 Subject: [PATCH] Refactored the CloudWatchLogsDispatcher into two classes with the addition of Uploader, introduced simple multithread tests for CloudWatchLogsService Signed-off-by: Marcos Gonzalez Mayedo --- .../sink/client/CloudWatchLogsDispatcher.java | 17 +-- .../sink/client/CloudWatchLogsService.java | 105 ++++++---------- .../sink/packaging/ThreadTaskEvents.java | 30 ----- .../client/CloudWatchLogsDispatcherTest.java | 99 +++++---------- .../client/CloudWatchLogsServiceTest.java | 113 ++++++++++++------ .../plugins/sink/client/UploaderTest.java | 93 ++++++++++++++ 6 files changed, 245 insertions(+), 212 deletions(-) delete mode 100644 data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/packaging/ThreadTaskEvents.java create mode 100644 data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/UploaderTest.java diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java index a7c96053c7..93bb8b9245 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java @@ -7,7 +7,6 @@ import lombok.Builder; import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkClientException; @@ -16,13 +15,14 @@ import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.Executor; -import static java.util.concurrent.Executors.newCachedThreadPool; - +@Builder public class CloudWatchLogsDispatcher { private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 5000; private static final float EXP_TIME_SCALE = 1.5F; @@ -36,6 +36,7 @@ public class CloudWatchLogsDispatcher { private long backOffTimeBase; public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient, final CloudWatchLogsMetrics cloudWatchLogsMetrics, + Executor asyncExecutor, final String logGroup, final String logStream, final int retryCount, final long backOffTimeBase) { this.cloudWatchLogsClient = cloudWatchLogsClient; @@ -45,15 +46,15 @@ public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient, this.retryCount = retryCount; this.backOffTimeBase = backOffTimeBase; - asyncExecutor = newCachedThreadPool(); + this.asyncExecutor = asyncExecutor; } - public List prepareInputLogEvents(final ThreadTaskEvents eventData) { + public List prepareInputLogEvents(final Collection eventMessageBytes) { List logEventList = new ArrayList<>(); - for (byte[] data: eventData.getEventMessages()) { + for (byte[] data : eventMessageBytes) { InputLogEvent tempLogEvent = InputLogEvent.builder() - .message(new String(data)) + .message(new String(data, StandardCharsets.UTF_8)) .timestamp(System.currentTimeMillis()) .build(); logEventList.add(tempLogEvent); @@ -84,7 +85,7 @@ public void dispatchLogs(List inputLogEvents, Collection bufferedEventHandles; private final SinkStopWatch sinkStopWatch; - private final ReentrantLock bufferLock; - private final String logGroup; - private final String logStream; - private final int retryCount; - private final long backOffTimeBase; - + private final ReentrantLock processLock; public CloudWatchLogsService(final Buffer buffer, - final CloudWatchLogsClient cloudWatchLogsClient, - final CloudWatchLogsMetrics cloudWatchLogsMetrics, final CloudWatchLogsLimits cloudWatchLogsLimits, - final String logGroup, final String logStream, - final int retryCount, final long backOffTimeBase) { + final CloudWatchLogsDispatcher cloudWatchLogsDispatcher) { this.buffer = buffer; - this.cloudWatchLogsClient = cloudWatchLogsClient; - this.cloudWatchLogsMetrics = cloudWatchLogsMetrics; this.cloudWatchLogsLimits = cloudWatchLogsLimits; - this.logGroup = logGroup; - this.logStream = logStream; - this.retryCount = retryCount; - this.backOffTimeBase = backOffTimeBase; - this.bufferedEventHandles = new ArrayList<>(); - bufferLock = new ReentrantLock(); + processLock = new ReentrantLock(); sinkStopWatch = new SinkStopWatch(); - cloudWatchLogsDispatcher = new CloudWatchLogsDispatcher(cloudWatchLogsClient, - cloudWatchLogsMetrics, logGroup, logStream, retryCount, backOffTimeBase); + this.cloudWatchLogsDispatcher = cloudWatchLogsDispatcher; } /** @@ -78,62 +57,54 @@ public CloudWatchLogsService(final Buffer buffer, * @param logs - Collection of Record events which hold log data. */ public void processLogEvents(final Collection> logs) { - sinkStopWatch.startIfNotRunning(); - for (Record log: logs) { - String logString = log.getData().toJsonString(); - int logLength = logString.length(); - - if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) { - LOG.warn("Event blocked due to Max Size restriction! {Event Size: {} bytes}", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE)); - continue; + try { + processLock.lock(); + sinkStopWatch.startIfNotRunning(); + for (Record log : logs) { + String logString = log.getData().toJsonString(); + int logLength = logString.length(); + + if (cloudWatchLogsLimits.isGreaterThanMaxEventSize(logLength)) { + LOG.warn("Event blocked due to Max Size restriction! {Event Size: {} bytes}", (logLength + CloudWatchLogsLimits.APPROXIMATE_LOG_EVENT_OVERHEAD_SIZE)); + continue; + } + + long time = sinkStopWatch.getStopWatchTimeSeconds(); + + int bufferSize = buffer.getBufferSize(); + int bufferEventCount = buffer.getEventCount(); + int bufferEventCountWithEvent = bufferEventCount + 1; + int bufferSizeWithAddedEvent = bufferSize + logLength; + + if ((cloudWatchLogsLimits.isGreaterThanLimitReached(time, bufferSizeWithAddedEvent, bufferEventCountWithEvent) && (bufferEventCount > 0))) { + stageLogEvents(); + addToBuffer(log, logString); + } else if (cloudWatchLogsLimits.isEqualToLimitReached(bufferSizeWithAddedEvent, bufferEventCountWithEvent)) { + addToBuffer(log, logString); + stageLogEvents(); + } else { + addToBuffer(log, logString); + } } - - long time = sinkStopWatch.getStopWatchTimeSeconds(); - - bufferLock.lock(); - - int bufferSize = buffer.getBufferSize(); - int bufferEventCount = buffer.getEventCount(); - int bufferEventCountWithEvent = bufferEventCount + 1; - int bufferSizeWithAddedEvent = bufferSize + logLength; - - if ((cloudWatchLogsLimits.isGreaterThanLimitReached(time, bufferSizeWithAddedEvent, bufferEventCountWithEvent) && (bufferEventCount > 0))) { - stageLogEvents(); - addToBuffer(log, logString); - } else if (cloudWatchLogsLimits.isEqualToLimitReached(bufferSizeWithAddedEvent, bufferEventCountWithEvent)) { - addToBuffer(log, logString); - stageLogEvents(); - } else { - addToBuffer(log, logString); - } - - bufferLock.unlock(); + } finally { + processLock.unlock(); } } private void stageLogEvents() { sinkStopWatch.stopAndResetStopWatch(); - List eventMessageClone = buffer.getBufferedData(); - ThreadTaskEvents dataToPush = new ThreadTaskEvents(eventMessageClone, bufferedEventHandles); + List inputLogEvents = cloudWatchLogsDispatcher.prepareInputLogEvents(buffer.getBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEvents, bufferedEventHandles); buffer.resetBuffer(); bufferedEventHandles = new ArrayList<>(); - - List inputLogEvents = cloudWatchLogsDispatcher.prepareInputLogEvents(dataToPush); - cloudWatchLogsDispatcher.dispatchLogs(inputLogEvents, dataToPush.getEventHandles()); } private void addToBuffer(final Record log, final String logString) { if (log.getData().getEventHandle() != null) { bufferedEventHandles.add(log.getData().getEventHandle()); } - buffer.writeEvent(logString.getBytes()); - } - - private void cloneLists(List listToCopy, List listToCopyInto) { - for (byte[] holder: listToCopy) { - listToCopyInto.add(holder.clone()); - } + buffer.writeEvent(logString.getBytes(StandardCharsets.UTF_8)); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/packaging/ThreadTaskEvents.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/packaging/ThreadTaskEvents.java deleted file mode 100644 index fb9653af92..0000000000 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/packaging/ThreadTaskEvents.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.sink.packaging; - -import org.opensearch.dataprepper.model.event.EventHandle; - -import java.util.Collection; - -/** - * Simple data class for packaging event messages and their handles into a queue. - */ -public class ThreadTaskEvents { - Collection eventMessages; - Collection eventHandles; - public ThreadTaskEvents(Collection eventMessages, Collection eventHandles) { - this.eventMessages = eventMessages; - this.eventHandles = eventHandles; - } - - public Collection getEventMessages() { - return eventMessages; - } - - public Collection getEventHandles() { - return eventHandles; - } -} diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcherTest.java index 5b3141ebcc..715aafb24c 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcherTest.java @@ -5,36 +5,28 @@ package org.opensearch.dataprepper.plugins.sink.client; -import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; -import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents; -import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; -import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; -import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executor; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - public class CloudWatchLogsDispatcherTest { +class CloudWatchLogsDispatcherTest { private CloudWatchLogsDispatcher cloudWatchLogsDispatcher; private CloudWatchLogsClient cloudWatchLogsClient; private CloudWatchLogsMetrics cloudWatchLogsMetrics; - private PluginMetrics pluginMetrics; - private Counter requestSuccessCounter; - private Counter requestFailCounter; - private Counter successEventCounter; - private Counter failedEventCounter; + private Executor asyncExecutor; private static final String LOG_GROUP = "testGroup"; private static final String LOG_STREAM = "testStream"; private static final String TEST_STRING = "testMessage"; @@ -42,79 +34,48 @@ public class CloudWatchLogsDispatcherTest { @BeforeEach void setUp() throws InterruptedException { cloudWatchLogsClient = mock(CloudWatchLogsClient.class); - - pluginMetrics = mock(PluginMetrics.class); - requestSuccessCounter = mock(Counter.class); - requestFailCounter = mock(Counter.class); - successEventCounter = mock(Counter.class); - failedEventCounter = mock(Counter.class); - cloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class); - - when(pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED)).thenReturn(successEventCounter); - when(pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED)).thenReturn(requestSuccessCounter); - when(pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED)).thenReturn(failedEventCounter); - when(pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED)).thenReturn(requestFailCounter); + asyncExecutor = mock(Executor.class); } - ThreadTaskEvents getSampleBufferedData() { + Collection getSampleBufferedData() { final ArrayList returnCollection = new ArrayList<>(); - final ArrayList eventHandles = new ArrayList<>(); for (int i = 0; i < ThresholdConfig.DEFAULT_BATCH_SIZE; i++) { returnCollection.add(new String(TEST_STRING).getBytes()); - final EventHandle mockEventHandle = mock(EventHandle.class); - eventHandles.add(mockEventHandle); } - return new ThreadTaskEvents(returnCollection, eventHandles); - } - - CloudWatchLogsDispatcher getCloudWatchLogsDispatcher() { - return new CloudWatchLogsDispatcher(cloudWatchLogsClient, - cloudWatchLogsMetrics, LOG_GROUP, LOG_STREAM, ThresholdConfig.DEFAULT_RETRY_COUNT, - ThresholdConfig.DEFAULT_BACKOFF_TIME); - } - - void establishFailingClientWithCloudWatchLogsExcept() { - when(cloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenThrow(CloudWatchLogsException.class); - } - - void establishFailingClientWithSdkClientExcept() { - when(cloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenThrow(SdkClientException.class); + return returnCollection; } - void setUpInterruptedQueueException() throws InterruptedException { -// when(mockTaskQueue.take()).thenThrow(InterruptedException.class); - } + Collection getSampleEventHandles() { + final ArrayList eventHandles = new ArrayList<>(); - @Test - void check_successful_transmission_test() { - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(); - cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); - cloudWatchLogsDispatcher.dispatchLogs(); + for (int i = 0; i < ThresholdConfig.DEFAULT_BATCH_SIZE; i++) { + final EventHandle mockEventHandle = mock(EventHandle.class); + eventHandles.add(mockEventHandle); + } - verify(cloudWatchLogsMetrics, atLeastOnce()).increaseRequestSuccessCounter(1); - verify(cloudWatchLogsMetrics, atLeastOnce()).increaseLogEventSuccessCounter(ThresholdConfig.DEFAULT_BATCH_SIZE); + return eventHandles; } - @Test - void check_unsuccesful_transmission_with_cloudwatchlogsexcept_test() { - establishFailingClientWithCloudWatchLogsExcept(); - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(); - cloudWatchLogsDispatcher.run(); - - verify(cloudWatchLogsMetrics, times(ThresholdConfig.DEFAULT_RETRY_COUNT)).increaseRequestFailCounter(1); - verify(cloudWatchLogsMetrics, atLeastOnce()).increaseLogEventFailCounter(ThresholdConfig.DEFAULT_BATCH_SIZE); + CloudWatchLogsDispatcher getCloudWatchLogsDispatcher() { + return CloudWatchLogsDispatcher.builder().cloudWatchLogsClient(cloudWatchLogsClient) + .cloudWatchLogsMetrics(cloudWatchLogsMetrics) + .asyncExecutor(asyncExecutor) + .logGroup(LOG_GROUP) + .logStream(LOG_STREAM) + .retryCount(ThresholdConfig.DEFAULT_RETRY_COUNT) + .backOffTimeBase(ThresholdConfig.DEFAULT_BACKOFF_TIME) + .build(); } @Test - void check_unsuccesful_transmission_with_sdkexcept_test() { - establishFailingClientWithSdkClientExcept(); + void check_execute_called_test() { cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(); - cloudWatchLogsDispatcher.run(); + List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, getSampleEventHandles()); - verify(cloudWatchLogsMetrics, times(ThresholdConfig.DEFAULT_RETRY_COUNT)).increaseRequestFailCounter(1); - verify(cloudWatchLogsMetrics, atLeastOnce()).increaseLogEventFailCounter(ThresholdConfig.DEFAULT_BATCH_SIZE); + verify(asyncExecutor, atMostOnce()).execute(any(CloudWatchLogsDispatcher.Uploader.class)); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java index 42283e614d..767d553f9a 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsServiceTest.java @@ -12,36 +12,31 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.sink.buffer.Buffer; +import org.opensearch.dataprepper.plugins.sink.buffer.InMemoryBuffer; import org.opensearch.dataprepper.plugins.sink.buffer.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.sink.config.CloudWatchLogsSinkConfig; import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; -import org.opensearch.dataprepper.plugins.sink.packaging.ThreadTaskEvents; import org.opensearch.dataprepper.plugins.sink.utils.CloudWatchLogsLimits; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; -import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; import java.util.ArrayList; import java.util.Collection; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.List; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.when; public class CloudWatchLogsServiceTest { - private static final int MAX_QUEUE_SIZE = 100; + private static int SMALL_THREAD_COUNT = 50; + private static int MEDIUM_THREAD_COUNT = 100; + private static int HIGH_THREAD_COUNT = 500; + private static int LARGE_THREAD_COUNT = 1000; private CloudWatchLogsClient mockClient; private CloudWatchLogsMetrics mockMetrics; - private BlockingQueue testQueue; private CloudWatchLogsService cloudWatchLogsService; private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; private ThresholdConfig thresholdConfig; @@ -49,8 +44,6 @@ public class CloudWatchLogsServiceTest { private InMemoryBufferFactory inMemoryBufferFactory; private Buffer buffer; private CloudWatchLogsDispatcher testDispatcher; - private final String logGroup = "testGroup"; - private final String logStream = "testStream"; @BeforeEach void setUp() { @@ -63,14 +56,9 @@ void setUp() { mockClient = mock(CloudWatchLogsClient.class); mockMetrics = mock(CloudWatchLogsMetrics.class); inMemoryBufferFactory = new InMemoryBufferFactory(); - buffer = inMemoryBufferFactory.getBuffer(); testDispatcher = mock(CloudWatchLogsDispatcher.class); - testQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); - - cloudWatchLogsService = new CloudWatchLogsService(buffer, mockClient, mockMetrics, - cloudWatchLogsLimits, - logGroup, logStream, - thresholdConfig.getRetryCount(), thresholdConfig.getBackOffTime()); + cloudWatchLogsService = new CloudWatchLogsService(buffer, + cloudWatchLogsLimits, testDispatcher); } Collection> getSampleRecordsLess() { @@ -97,35 +85,84 @@ Collection> getSampleRecords() { return returnCollection; } - Collection> getSampleRecordsLarge() { - final ArrayList> returnCollection = new ArrayList<>(); - for (int i = 0; i < (thresholdConfig.getBatchSize() * 4); i++) { - JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("testMessage"); - final EventHandle mockEventHandle = mock(EventHandle.class); - mockJacksonEvent.setEventHandle(mockEventHandle); - returnCollection.add(new Record<>(mockJacksonEvent)); - } + void setUpSpyBuffer() { + buffer = spy(InMemoryBuffer.class); + } - return returnCollection; + void setUpRealBuffer() { + buffer = inMemoryBufferFactory.getBuffer(); + } + + CloudWatchLogsService getSampleService() { + return new CloudWatchLogsService(buffer, cloudWatchLogsLimits, testDispatcher); } @Test void check_dispatcher_run_was_not_called() { + setUpRealBuffer(); + cloudWatchLogsService = getSampleService(); cloudWatchLogsService.processLogEvents(getSampleRecordsLess()); - verify(mockClient, never()).putLogEvents(any(PutLogEventsRequest.class)); + verify(testDispatcher, never()).dispatchLogs(any(List.class), any(Collection.class)); } @Test void check_dispatcher_run_was_called_test() throws InterruptedException { + setUpRealBuffer(); + cloudWatchLogsService = getSampleService(); cloudWatchLogsService.processLogEvents(getSampleRecords()); - Thread.sleep(100); - verify(mockClient, atLeastOnce()).putLogEvents(any(PutLogEventsRequest.class)); + verify(testDispatcher, atLeast(1)).dispatchLogs(any(List.class), any(Collection.class)); + } + + //Multithreaded tests: + void testThreadsProcessingLogsWithNormalSample(final int numberOfThreads) throws InterruptedException { + Thread[] threads = new Thread[numberOfThreads]; + Collection> sampleEvents = getSampleRecords(); + + for (int i = 0; i < numberOfThreads; i++) { + threads[i] = new Thread(() -> { + cloudWatchLogsService.processLogEvents(sampleEvents); + }); + threads[i].start(); + } + + for (int i = 0; i < numberOfThreads; i++) { + threads[i].join(); + } } @Test - void check_dispatcher_run_called_heavy_load() throws InterruptedException { - cloudWatchLogsService.processLogEvents(getSampleRecordsLarge()); - Thread.sleep(100); - verify(mockClient, atLeast(4)).putLogEvents(any(PutLogEventsRequest.class)); + void test_buffer_access_with_small_thread_count_test() throws InterruptedException { + setUpSpyBuffer(); + cloudWatchLogsService = getSampleService(); + testThreadsProcessingLogsWithNormalSample(SMALL_THREAD_COUNT); + + verify(buffer, atLeast(SMALL_THREAD_COUNT)).getBufferedData(); + } + + @Test + void test_buffer_access_with_medium_thread_count_test() throws InterruptedException { + setUpSpyBuffer(); + cloudWatchLogsService = getSampleService(); + testThreadsProcessingLogsWithNormalSample(MEDIUM_THREAD_COUNT); + + verify(buffer, atLeast(MEDIUM_THREAD_COUNT)).getBufferedData(); + } + + @Test + void test_buffer_access_with_high_thread_count_test() throws InterruptedException { + setUpSpyBuffer(); + cloudWatchLogsService = getSampleService(); + testThreadsProcessingLogsWithNormalSample(HIGH_THREAD_COUNT); + + verify(buffer, atLeast(HIGH_THREAD_COUNT)).getBufferedData(); + } + + @Test + void test_buffer_access_with_large_thread_count_test() throws InterruptedException { + setUpSpyBuffer(); + cloudWatchLogsService = getSampleService(); + testThreadsProcessingLogsWithNormalSample(LARGE_THREAD_COUNT); + + verify(buffer, atLeast(LARGE_THREAD_COUNT)).getBufferedData(); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/UploaderTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/UploaderTest.java new file mode 100644 index 0000000000..04ecc76bc7 --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/client/UploaderTest.java @@ -0,0 +1,93 @@ +package org.opensearch.dataprepper.plugins.sink.client; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; +import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; + +import java.util.ArrayList; +import java.util.Collection; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class UploaderTest { + private CloudWatchLogsClient cloudWatchLogsClient; + private CloudWatchLogsMetrics cloudWatchLogsMetrics; + + @BeforeEach + void setUp() { + cloudWatchLogsClient = mock(CloudWatchLogsClient.class); + cloudWatchLogsMetrics = mock(CloudWatchLogsMetrics.class); + } + + Collection getTestEventHandles() { + final ArrayList eventHandles = new ArrayList<>(); + for (int i = 0; i < ThresholdConfig.DEFAULT_BATCH_SIZE; i++) { + final EventHandle mockEventHandle = mock(EventHandle.class); + eventHandles.add(mockEventHandle); + } + + return eventHandles; + } + + PutLogEventsRequest getMockPutLogEventsRequest() { + return mock(PutLogEventsRequest.class); + } + + CloudWatchLogsDispatcher.Uploader getUploader() { + return CloudWatchLogsDispatcher.Uploader.builder() + .cloudWatchLogsClient(cloudWatchLogsClient) + .cloudWatchLogsMetrics(cloudWatchLogsMetrics) + .putLogEventsRequest(getMockPutLogEventsRequest()) + .eventHandles(getTestEventHandles()) + .retryCount(ThresholdConfig.DEFAULT_RETRY_COUNT) + .backOffTimeBase(ThresholdConfig.DEFAULT_BACKOFF_TIME) + .build(); + } + + void establishFailingClientWithCloudWatchLogsExcept() { + when(cloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenThrow(CloudWatchLogsException.class); + } + + void establishFailingClientWithSdkClientExcept() { + when(cloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenThrow(SdkClientException.class); + } + + @Test + void check_successful_transmission_test() throws InterruptedException { + CloudWatchLogsDispatcher.Uploader testUploader = getUploader(); + testUploader.run(); + + verify(cloudWatchLogsMetrics, atLeastOnce()).increaseRequestSuccessCounter(1); + verify(cloudWatchLogsMetrics, atLeastOnce()).increaseLogEventSuccessCounter(ThresholdConfig.DEFAULT_BATCH_SIZE); + } + + @Test + void check_unsuccesful_transmission_with_cloudwatchlogsexcept_test() throws InterruptedException { + establishFailingClientWithCloudWatchLogsExcept(); + CloudWatchLogsDispatcher.Uploader testUploader = getUploader(); + testUploader.run(); + + verify(cloudWatchLogsMetrics, times(ThresholdConfig.DEFAULT_RETRY_COUNT)).increaseRequestFailCounter(1); + verify(cloudWatchLogsMetrics, atLeastOnce()).increaseLogEventFailCounter(ThresholdConfig.DEFAULT_BATCH_SIZE); + } + + @Test + void check_unsuccesful_transmission_with_sdkexcept_test() { + establishFailingClientWithSdkClientExcept(); + CloudWatchLogsDispatcher.Uploader testUploader = getUploader(); + testUploader.run(); + + verify(cloudWatchLogsMetrics, times(ThresholdConfig.DEFAULT_RETRY_COUNT)).increaseRequestFailCounter(1); + verify(cloudWatchLogsMetrics, atLeastOnce()).increaseLogEventFailCounter(ThresholdConfig.DEFAULT_BATCH_SIZE); + } +}