diff --git a/data-prepper-plugins/log-generator-source/src/test/java/org/opensearch/dataprepper/plugins/source/loggenerator/LogGeneratorSourceTest.java b/data-prepper-plugins/log-generator-source/src/test/java/org/opensearch/dataprepper/plugins/source/loggenerator/LogGeneratorSourceTest.java index 180fcae952..28d58f8cff 100644 --- a/data-prepper-plugins/log-generator-source/src/test/java/org/opensearch/dataprepper/plugins/source/loggenerator/LogGeneratorSourceTest.java +++ b/data-prepper-plugins/log-generator-source/src/test/java/org/opensearch/dataprepper/plugins/source/loggenerator/LogGeneratorSourceTest.java @@ -6,36 +6,37 @@ package org.opensearch.dataprepper.plugins.source.loggenerator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.loggenerator.LogGeneratorSourceConfig.INFINITE_LOG_COUNT; @@ -50,6 +51,9 @@ public class LogGeneratorSourceTest { private LogGeneratorSource logGeneratorSource; @Mock private PluginModel mockLogPluginModel; + @Mock + private Buffer buffer; + @BeforeEach public void setup() { when(sourceConfig.getLogType()).thenReturn(mockLogPluginModel); @@ -79,21 +83,23 @@ private LogGeneratorSource createObjectUnderTest() { @Test void GIVEN_logGeneratorSourceAndBlockingBuffer_WHEN_noLimit_THEN_keepsWritingToBufferUntilStopped() - throws InterruptedException, TimeoutException { + throws TimeoutException { logGeneratorSource = createObjectUnderTest(); - BlockingBuffer> spyBuffer = spy(new BlockingBuffer>("SamplePipeline")); - Duration interval = Duration.ofMillis(100); lenient().when(sourceConfig.getInterval()).thenReturn(interval); lenient().when(sourceConfig.getCount()).thenReturn(INFINITE_LOG_COUNT); // no limit to log count - logGeneratorSource.start(spyBuffer); - Thread.sleep((long) (interval.toMillis() * 1.5)); - verify(spyBuffer, atLeast(1)).write(any(Record.class), anyInt()); - Thread.sleep((long) (interval.toMillis() * 0.7)); - verify(spyBuffer, atLeast(2)).write(any(Record.class), anyInt()); + logGeneratorSource.start(buffer); + await() + .atMost((long) (interval.toMillis() * 1.5), TimeUnit.MILLISECONDS) + .untilAsserted(() -> verify(buffer, atLeast(1)).write(any(Record.class), anyInt())); + verify(buffer, atLeast(1)).write(any(Record.class), anyInt()); + await() + .atMost((long) (interval.toMillis() * 1.5), TimeUnit.MILLISECONDS) + .untilAsserted(() -> verify(buffer, atLeast(1)).write(any(Record.class), anyInt())); + verify(buffer, atLeast(2)).write(any(Record.class), anyInt()); } @Test @@ -101,20 +107,21 @@ void GIVEN_logGeneratorSourceAndBlockingBuffer_WHEN_reachedLimit_THEN_stopsWriti throws InterruptedException, TimeoutException { logGeneratorSource = createObjectUnderTest(); - BlockingBuffer> spyBuffer = spy(new BlockingBuffer>("SamplePipeline")); - Duration interval = Duration.ofMillis(100); lenient().when(sourceConfig.getInterval()).thenReturn(interval); lenient().when(sourceConfig.getCount()).thenReturn(1); // max log count of 1 in logGeneratorSource - assertEquals(spyBuffer.isEmpty(), true); - logGeneratorSource.start(spyBuffer); - Thread.sleep((long) (interval.toMillis() * 1.1)); + verifyNoInteractions(buffer); - verify(spyBuffer, times(1)).write(any(Record.class), anyInt()); + logGeneratorSource.start(buffer); - Thread.sleep(interval.toMillis()); - verify(spyBuffer, times(1)).write(any(Record.class), anyInt()); + await() + .atMost(interval.multipliedBy(3)) + .untilAsserted(() -> verify(buffer, atLeast(1)).write(any(Record.class), anyInt())); + verify(buffer, times(1)).write(any(Record.class), anyInt()); + + Thread.sleep((long) (interval.toMillis() * 1.1)); + verify(buffer, times(1)).write(any(Record.class), anyInt()); } } \ No newline at end of file