Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into sns-sink-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
udaych20 authored Jul 26, 2023
2 parents 6a9ef69 + 548b5e0 commit 818a03f
Show file tree
Hide file tree
Showing 33 changed files with 946 additions and 324 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;

import static org.awaitility.Awaitility.await;

public class AbstractSinkTest {
@Test
public void testMetrics() {
Expand Down Expand Up @@ -51,8 +54,8 @@ public void testMetrics() {
Assert.assertEquals(1.0, MetricsTestUtil.getMeasurementFromList(elapsedTimeMeasurements, Statistic.COUNT).getValue(), 0);
Assert.assertTrue(MetricsTestUtil.isBetween(
MetricsTestUtil.getMeasurementFromList(elapsedTimeMeasurements, Statistic.TOTAL_TIME).getValue(),
0.5,
0.6));
0.2,
0.3));
Assert.assertEquals(abstractSink.getRetryThreadState(), null);
abstractSink.shutdown();
}
Expand All @@ -71,14 +74,8 @@ public void testSinkNotReady() {
// Do another intialize to make sure the sink is still not ready
abstractSink.initialize();
Assert.assertEquals(abstractSink.isReady(), false);
while (!abstractSink.isReady()) {
try {
Thread.sleep(1000);
} catch (Exception e) {}
}
try {
Thread.sleep(2000);
} catch (Exception e) {}
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
Assert.assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
abstractSink.shutdown();
}
Expand All @@ -92,7 +89,7 @@ public AbstractSinkImpl(PluginSetting pluginSetting) {
@Override
public void doOutput(Collection<Record<String>> records) {
try {
Thread.sleep(500);
Thread.sleep(200);
} catch (InterruptedException e) {

}
Expand Down Expand Up @@ -126,7 +123,7 @@ public AbstractSinkNotReadyImpl(PluginSetting pluginSetting) {
@Override
public void doOutput(Collection<Record<String>> records) {
try {
Thread.sleep(500);
Thread.sleep(100);
} catch (InterruptedException e) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -19,40 +18,33 @@
@ExtendWith(MockitoExtension.class)
public class SinkThreadTest {
@Mock
AbstractSink sink;
private AbstractSink sink;

SinkThread sinkThread;
private SinkThread sinkThread;

@Test
public void testSinkThread() {
when(sink.isReady()).thenReturn(true);
sinkThread = new SinkThread(sink, 5, 1000);
sinkThread = new SinkThread(sink, 5, 100);
sinkThread.run();
verify(sink, times(1)).isReady();
}

@Test
public void testSinkThread2() {
when(sink.isReady()).thenReturn(false);
sinkThread = new SinkThread(sink, 5, 1000);
sinkThread = new SinkThread(sink, 5, 100);
sinkThread.run();
verify(sink, times(6)).isReady();
try {
doAnswer((i) -> {
return null;
}).when(sink).doInitialize();
verify(sink, times(5)).doInitialize();
} catch (Exception e){}
verify(sink, times(5)).doInitialize();
when(sink.isReady()).thenReturn(false).thenReturn(true);
sinkThread.run();
verify(sink, times(8)).isReady();
when(sink.isReady()).thenReturn(false).thenReturn(true);
try {
lenient().doAnswer((i) -> {
throw new InterruptedException("Fake interrupt");
}).when(sink).doInitialize();
sinkThread.run();
verify(sink, times(7)).doInitialize();
} catch (Exception e){}
lenient().doAnswer((i) -> {
throw new InterruptedException("Fake interrupt");
}).when(sink).doInitialize();
sinkThread.run();
verify(sink, times(7)).doInitialize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

@ExtendWith(MockitoExtension.class)
public class AcknowledgementSetMonitorTests {
private static final int DEFAULT_WAIT_TIME_MS = 2000;
private static final int DEFAULT_WAIT_TIME_MS = 500;
@Mock
DefaultAcknowledgementSet acknowledgementSet1;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;

import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.doAnswer;
Expand All @@ -27,7 +29,7 @@

@ExtendWith(MockitoExtension.class)
class DefaultAcknowledgementSetManagerTests {
private static final Duration TEST_TIMEOUT_MS = Duration.ofMillis(1000);
private static final Duration TEST_TIMEOUT = Duration.ofMillis(400);
DefaultAcknowledgementSetManager acknowledgementSetManager;
private ExecutorService callbackExecutor;

Expand Down Expand Up @@ -61,49 +63,57 @@ void setup() {
lenient().when(event2.getEventHandle()).thenReturn(eventHandle2);

acknowledgementSetManager = createObjectUnderTest();
AcknowledgementSet acknowledgementSet1 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT_MS);
AcknowledgementSet acknowledgementSet1 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT);
acknowledgementSet1.add(event1);
acknowledgementSet1.add(event2);
acknowledgementSet1.complete();
}

DefaultAcknowledgementSetManager createObjectUnderTest() {
return new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(TEST_TIMEOUT_MS.toMillis() * 2));
return new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(TEST_TIMEOUT.toMillis() * 2));
}

@Test
void testBasic() throws InterruptedException {
void testBasic() {
acknowledgementSetManager.releaseEventReference(eventHandle2, true);
acknowledgementSetManager.releaseEventReference(eventHandle1, true);
Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5);
await().atMost(TEST_TIMEOUT.multipliedBy(5))
.untilAsserted(() -> {
assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0));
assertThat(result, equalTo(true));
});
assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0));
assertThat(result, equalTo(true));
}

@Test
void testExpirations() throws InterruptedException {
acknowledgementSetManager.releaseEventReference(eventHandle2, true);
Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5);
Thread.sleep(TEST_TIMEOUT.multipliedBy(5).toMillis());
assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0));
assertThat(result, equalTo(null));
}

@Test
void testMultipleAcknowledgementSets() throws InterruptedException {
void testMultipleAcknowledgementSets() {
event3 = mock(JacksonEvent.class);
doAnswer((i) -> {
eventHandle3 = i.getArgument(0);
return null;
}).when(event3).setEventHandle(any());
lenient().when(event3.getEventHandle()).thenReturn(eventHandle3);

AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT_MS);
AcknowledgementSet acknowledgementSet2 = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT);
acknowledgementSet2.add(event3);
acknowledgementSet2.complete();

acknowledgementSetManager.releaseEventReference(eventHandle2, true);
acknowledgementSetManager.releaseEventReference(eventHandle3, true);
Thread.sleep(TEST_TIMEOUT_MS.toMillis() * 5);
await().atMost(TEST_TIMEOUT.multipliedBy(5))
.untilAsserted(() -> {
assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0));
assertThat(result, equalTo(true));
});
assertThat(acknowledgementSetManager.getAcknowledgementSetMonitor().getSize(), equalTo(0));
assertThat(result, equalTo(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class PeerForwarderReceiveBufferTest {
private static final int TEST_BATCH_SIZE = 3;
private static final int TEST_BUFFER_SIZE = 13;
private static final int TEST_WRITE_TIMEOUT = 100;
private static final int TEST_BATCH_READ_TIMEOUT = 5_000;
private static final int TEST_BATCH_READ_TIMEOUT = 200;
private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
private static final String PIPELINE_NAME = UUID.randomUUID().toString();
private static final String PLUGIN_ID = UUID.randomUUID().toString();
Expand Down Expand Up @@ -204,7 +204,7 @@ void testNonZeroBatchDelayReturnsAllRecords() throws Exception {
final Collection<Record<String>> testRecords2 = generateBatchRecords(1);
EXECUTOR.submit(() -> {
try {
Thread.sleep(1000);
Thread.sleep(TEST_BATCH_READ_TIMEOUT / 5);
peerForwarderReceiveBuffer.writeAll(testRecords2, TEST_WRITE_TIMEOUT);
} catch (final Exception e) {
throw new RuntimeException(e);
Expand All @@ -229,7 +229,7 @@ void testZeroBatchDelayReturnsAvailableRecords() throws Exception {
final Collection<Record<String>> testRecords2 = generateBatchRecords(1);
EXECUTOR.submit(() -> {
try {
Thread.sleep(1000);
Thread.sleep(TEST_BATCH_READ_TIMEOUT / 5);
peerForwarderReceiveBuffer.writeAll(testRecords2, TEST_WRITE_TIMEOUT);
} catch (final Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private Collection<Record<Event>> getServerSideRecords(final PeerForwarderProvid
assertThat(pluginBufferMap, notNullValue());
final PeerForwarderReceiveBuffer<Record<Event>> receiveBuffer = pluginBufferMap.get(pluginId);

final Map.Entry<Collection<Record<Event>>, CheckpointState> bufferEntry = receiveBuffer.read(1000);
final Map.Entry<Collection<Record<Event>>, CheckpointState> bufferEntry = receiveBuffer.read(400);
return bufferEntry.getKey();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@
class RemotePeerForwarderTest {
private static final int TEST_BUFFER_CAPACITY = 20;
private static final int TEST_BATCH_SIZE = 20;
private static final int TEST_BATCH_DELAY = 3_000;
private static final int TEST_LOCAL_WRITE_TIMEOUT = 500;
private static final int TEST_TIMEOUT_IN_MILLIS = 500;
private static final int TEST_BATCH_DELAY = 800;
private static final int TEST_LOCAL_WRITE_TIMEOUT = 400;
private static final int TEST_TIMEOUT_IN_MILLIS = 400;
private static final int FORWARDING_BATCH_SIZE = 5;
private static final int FORWARDING_BATCH_QUEUE_DEPTH = 1;
private static final Duration FORWARDING_BATCH_TIMEOUT = Duration.of(3, ChronoUnit.SECONDS);
private static final Duration FORWARDING_BATCH_TIMEOUT = Duration.of(800, ChronoUnit.MILLIS);
private static final int PIPELINE_WORKER_THREADS = 3;
private static final String PIPELINE_NAME = UUID.randomUUID().toString();
private static final String PLUGIN_ID = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand All @@ -67,7 +68,7 @@
import static org.mockito.Mockito.when;

class PipelineTests {
private static final int TEST_READ_BATCH_TIMEOUT = 3000;
private static final int TEST_READ_BATCH_TIMEOUT = 500;
private static final int TEST_PROCESSOR_THREADS = 1;
private static final String TEST_PIPELINE_NAME = "test-pipeline";

Expand Down Expand Up @@ -168,9 +169,9 @@ void testPipelineStateWithProcessor() {

@Test
void testPipelineDelayedReady() throws InterruptedException {
final int delayTimeSeconds = 10;
final Duration delayTime = Duration.ofMillis(2000);
final Source<Record<String>> testSource = new TestSource();
final TestSink testSink = new TestSink(delayTimeSeconds);
final TestSink testSink = new TestSink(delayTime);
final DataFlowComponent<Sink> sinkDataFlowComponent = mock(DataFlowComponent.class);
final TestProcessor testProcessor = new TestProcessor(new PluginSetting("test_processor", new HashMap<>()));
when(sinkDataFlowComponent.getComponent()).thenReturn(testSink);
Expand All @@ -182,11 +183,11 @@ void testPipelineDelayedReady() throws InterruptedException {
Instant startTime = Instant.now();
testPipeline.execute();
assertFalse(testPipeline.isReady());
for (int i = 0; i < delayTimeSeconds + 2; i++) {
Thread.sleep(1000);
}
await().atMost(Duration.ofSeconds(2).plus(delayTime))
.pollInterval(Duration.ofMillis(200))
.until(testPipeline::isReady);
assertTrue(testPipeline.isReady());
assertThat(Duration.between(startTime, Instant.now()), greaterThanOrEqualTo(Duration.ofSeconds(delayTimeSeconds)));
assertThat(Duration.between(startTime, Instant.now()), greaterThanOrEqualTo(delayTime));
assertThat("Pipeline isStopRequested is expected to be false", testPipeline.isStopRequested(), is(false));
testPipeline.shutdown();
assertThat("Pipeline isStopRequested is expected to be true", testPipeline.isStopRequested(), is(true));
Expand All @@ -196,9 +197,9 @@ void testPipelineDelayedReady() throws InterruptedException {

@Test
void testPipelineDelayedReadyShutdownBeforeReady() throws InterruptedException {
final int delayTimeSeconds = 10;
final Duration delayTime = Duration.ofSeconds(2);
final Source<Record<String>> testSource = new TestSource();
final TestSink testSink = new TestSink(delayTimeSeconds);
final TestSink testSink = new TestSink(delayTime);
final DataFlowComponent<Sink> sinkDataFlowComponent = mock(DataFlowComponent.class);
final TestProcessor testProcessor = new TestProcessor(new PluginSetting("test_processor", new HashMap<>()));
when(sinkDataFlowComponent.getComponent()).thenReturn(testSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -29,11 +30,11 @@ public TestSink() {
this.ready = true;
}

public TestSink(int readyAfterSecs) {
public TestSink(Duration readyAfter) {
this.ready = false;
this.failSinkForTest = false;
this.collectedRecords = new ArrayList<>();
this.readyTime = Instant.now().plusSeconds(readyAfterSecs);
this.readyTime = Instant.now().plus(readyAfter);
}

public TestSink(boolean failSinkForTest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private AggregateProcessor createObjectUnderTest() {
return new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator);
}

@RepeatedTest(value = 10)
@RepeatedTest(value = 2)
void aggregateWithNoConcludingGroupsReturnsExpectedResult() throws InterruptedException {
aggregateAction = new RemoveDuplicatesAggregateAction();
when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class)))
Expand Down Expand Up @@ -260,7 +260,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException {
}

@ParameterizedTest
@ValueSource(doubles = {5.0, 15.0, 33.0, 55.0, 70.0, 85.0, 92.0, 99.0})
@ValueSource(doubles = {5.0, 15.0, 55.0, 92.0, 99.0})
void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedException, NoSuchFieldException, IllegalAccessException {
PercentSamplerAggregateActionConfig percentSamplerAggregateActionConfig = new PercentSamplerAggregateActionConfig();
setField(PercentSamplerAggregateActionConfig.class, percentSamplerAggregateActionConfig, "percent", testPercent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
final Event event = record.getData();

final String message = event.get(config.getSource(), String.class);

if (Objects.isNull(message)) {
continue;
}

final boolean userDidSpecifyHeaderEventKey = Objects.nonNull(config.getColumnNamesSourceKey());
final boolean thisEventHasHeaderSource = userDidSpecifyHeaderEventKey && event.containsKey(config.getColumnNamesSourceKey());

Expand Down
Loading

0 comments on commit 818a03f

Please sign in to comment.