Skip to content

Commit

Permalink
Attempt to reduce flakiness in RandomStringSourceTests by using await…
Browse files Browse the repository at this point in the history
…ility. Split tests into two. JUnit 5. (#1921) (#1964)

Signed-off-by: David Venable <[email protected]>
(cherry picked from commit 3d641f7)

Co-authored-by: David Venable <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and dlvenable authored Oct 24, 2022
1 parent e70aa12 commit 5dbbd5d
Showing 1 changed file with 39 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,61 @@

package org.opensearch.dataprepper.plugins.source;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.TestBuffer;
import org.junit.Test;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

public class RandomStringSourceTests {
class RandomStringSourceTests {

@Test
public void testPutRecord() throws InterruptedException {
final RandomStringSource randomStringSource =
new RandomStringSource();
private TestBuffer buffer;

@BeforeEach
void setUp() {
final Queue<Record<Event>> bufferQueue = new ConcurrentLinkedQueue<>();
final TestBuffer buffer = new TestBuffer(bufferQueue, 1);
buffer = new TestBuffer(bufferQueue, 1);
}

private RandomStringSource createObjectUnderTest() {
return new RandomStringSource();
}

@Test
void testPutRecord() {
final RandomStringSource randomStringSource = createObjectUnderTest();

randomStringSource.start(buffer);
await().atMost(3, TimeUnit.SECONDS)
.pollDelay(200, TimeUnit.MILLISECONDS)
.until(() -> buffer.size() > 0);
assertThat(buffer.size(), greaterThan(0));
}

@Test
void testStop() throws InterruptedException {
final RandomStringSource randomStringSource = createObjectUnderTest();
//Start source, and sleep for 1000 millis
randomStringSource.start(buffer);
Thread.sleep(1000);
await().atMost(3, TimeUnit.SECONDS)
.pollDelay(200, TimeUnit.MILLISECONDS)
.until(() -> buffer.size() > 0);
//Stop the source, and wait long enough that another message would be sent
//if the source was running
assertThat(buffer.size(), greaterThan(0));
Thread.sleep(1000);
randomStringSource.stop();
assertThat(buffer.size(), greaterThan(0));
Thread.sleep(200); // Ensure the other thread has time to finish writing.
final int sizeAfterCompletion = buffer.size();
Thread.sleep(1000);
assertThat(buffer.size(), equalTo(sizeAfterCompletion));
}

}

0 comments on commit 5dbbd5d

Please sign in to comment.