Skip to content

Commit

Permalink
Refactor InMemorySource/Sink to a separate test framework package
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Oct 31, 2024
1 parent 5d28955 commit 2865c79
Show file tree
Hide file tree
Showing 20 changed files with 159 additions and 93 deletions.
1 change: 1 addition & 0 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dependencies {
testImplementation libs.commons.lang3
testImplementation project(':data-prepper-test-event')
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-test-framework')
testImplementation project(':data-prepper-api').sourceSets.test.output
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;

import java.util.List;
Expand All @@ -29,9 +29,12 @@

public class Connected_SingleExtraSinkIT {
private static final String IN_MEMORY_IDENTIFIER = "Connected_SingleExtraSinkIT";
private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/";
private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml";
private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/connected/";
private static final String IN_MEMORY_IDENTIFIER_ENTRY_SINK = IN_MEMORY_IDENTIFIER + "_Entry";
private static final String IN_MEMORY_IDENTIFIER_EXIT_SINK = IN_MEMORY_IDENTIFIER + "_Exit";
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "connected/single-connection-extra-sink.yaml";
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "single-connection-extra-sink.yaml";
private DataPrepperTestRunner dataPrepperTestRunner;
private InMemorySourceAccessor inMemorySourceAccessor;
private InMemorySinkAccessor inMemorySinkAccessor;
Expand All @@ -40,6 +43,7 @@ public class Connected_SingleExtraSinkIT {
void setUp() {
dataPrepperTestRunner = DataPrepperTestRunner.builder()
.withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST)
.withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE)
.build();

dataPrepperTestRunner.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;

import java.util.Collections;
Expand All @@ -31,7 +31,10 @@

public class Connected_SingleIT {
private static final String IN_MEMORY_IDENTIFIER = "Connected_SingleIT";
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "connected/single-connection.yaml";
private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/";
private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml";
private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/connected/";
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "single-connection.yaml";
private DataPrepperTestRunner dataPrepperTestRunner;
private InMemorySourceAccessor inMemorySourceAccessor;
private InMemorySinkAccessor inMemorySinkAccessor;
Expand All @@ -40,6 +43,7 @@ public class Connected_SingleIT {
void setUp() {
dataPrepperTestRunner = DataPrepperTestRunner.builder()
.withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST)
.withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE)
.build();

dataPrepperTestRunner.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@

class CoreHttpServerIT {
private static final Logger log = LoggerFactory.getLogger(CoreHttpServerIT.class);
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "minimal-pipeline.yaml";
private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/";
private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml";
private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/";
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "minimal-pipeline.yaml";
private DataPrepperTestRunner dataPrepperTestRunner;

@BeforeEach
void setUp() {
dataPrepperTestRunner = DataPrepperTestRunner.builder()
.withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST)
.withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE)
.build();

dataPrepperTestRunner.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;

import java.util.Collections;
Expand All @@ -32,7 +32,10 @@
class MinimalPipelineIT {

private static final String IN_MEMORY_IDENTIFIER = "MinimalPipelineIT";
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "minimal-pipeline.yaml";
private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/";
private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml";
private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/";
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "minimal-pipeline.yaml";
private DataPrepperTestRunner dataPrepperTestRunner;
private InMemorySourceAccessor inMemorySourceAccessor;
private InMemorySinkAccessor inMemorySinkAccessor;
Expand All @@ -41,6 +44,7 @@ class MinimalPipelineIT {
void setUp() {
dataPrepperTestRunner = DataPrepperTestRunner.builder()
.withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST)
.withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE)
.build();

dataPrepperTestRunner.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;

import java.util.Collections;
Expand All @@ -34,7 +34,10 @@

class MultiWorkerPipelineIT {
private static final String IN_MEMORY_IDENTIFIER = "MultiWorkerPipelineIT";
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "multi-worker.yaml";
private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/";
private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml";
private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/";
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "multi-worker.yaml";
private static final int WORKER_THREADS = 4;
private static final int BATCH_SIZE = 10;
private DataPrepperTestRunner dataPrepperTestRunner;
Expand All @@ -45,6 +48,7 @@ class MultiWorkerPipelineIT {
void setUp() {
dataPrepperTestRunner = DataPrepperTestRunner.builder()
.withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST)
.withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE)
.build();

inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
Expand All @@ -32,24 +36,28 @@
@FixMethodOrder()
class PipelinesWithAcksIT {
private static final Logger LOG = LoggerFactory.getLogger(PipelinesWithAcksIT.class);
private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/";
private static final String PIPELINE_BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/";
private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml";
private static final String IN_MEMORY_IDENTIFIER = "PipelinesWithAcksIT";
private static final String SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST = "acknowledgements/simple-test.yaml";
private static final String TWO_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/two-pipelines-test.yaml";
private static final String TWO_PARALLEL_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/two-parallel-pipelines-test.yaml";
private static final String THREE_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipelines-test.yaml";
private static final String THREE_PIPELINES_WITH_ROUTE_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipeline-route-test.yaml";
private static final String THREE_PIPELINES_WITH_UNROUTED_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipeline-unrouted-test.yaml";
private static final String THREE_PIPELINES_WITH_DEFAULT_ROUTE_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipeline-route-default-test.yaml";
private static final String THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipelines-test-multi-sink.yaml";
private static final String ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-three-sinks.yaml";
private static final String ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-ack-expiry-test.yaml";
private static final String SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "simple-test.yaml";
private static final String TWO_PIPELINES_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "two-pipelines-test.yaml";
private static final String TWO_PARALLEL_PIPELINES_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "two-parallel-pipelines-test.yaml";
private static final String THREE_PIPELINES_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-pipelines-test.yaml";
private static final String THREE_PIPELINES_WITH_ROUTE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-pipeline-route-test.yaml";
private static final String THREE_PIPELINES_WITH_UNROUTED_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-pipeline-unrouted-test.yaml";
private static final String THREE_PIPELINES_WITH_DEFAULT_ROUTE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-pipeline-route-default-test.yaml";
private static final String THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-pipelines-test-multi-sink.yaml";
private static final String ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "one-pipeline-three-sinks.yaml";
private static final String ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "one-pipeline-ack-expiry-test.yaml";
private DataPrepperTestRunner dataPrepperTestRunner;
private InMemorySourceAccessor inMemorySourceAccessor;
private InMemorySinkAccessor inMemorySinkAccessor;

void setUp(String configFile) {
dataPrepperTestRunner = DataPrepperTestRunner.builder()
.withPipelinesDirectoryOrFile(configFile)
.withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE)
.build();

LOG.info("PipelinesWithAcksIT with config file {} started at {}", configFile, Instant.now());
Expand All @@ -64,11 +72,29 @@ void tearDown() {
dataPrepperTestRunner.stop();
}

private List<Record<Event>> createRecords(int numRecords, boolean withStatus) {
List<Record<Event>> records = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
final int max = 600;
final int min = 100;
int status = (int)(Math.random() * (max - min + 1) + min);
Map<String, Object> eventMap = (withStatus) ?
Map.of("message", UUID.randomUUID().toString(), "status", status) :
Map.of("message", UUID.randomUUID().toString());
final Event event = JacksonEvent.builder()
.withEventType("event")
.withData(eventMap)
.build();
records.add(new Record<>(event));
}
return records;
}

@Test
void simple_pipeline_with_single_record() {
setUp(SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST);
final int numRecords = 1;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -84,7 +110,7 @@ void simple_pipeline_with_single_record() {
void simple_pipeline_with_multiple_records() {
setUp(SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -99,7 +125,7 @@ void simple_pipeline_with_multiple_records() {
void two_pipelines_with_multiple_records() {
setUp(TWO_PIPELINES_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -114,7 +140,7 @@ void two_pipelines_with_multiple_records() {
void three_pipelines_with_multiple_records() {
setUp(THREE_PIPELINES_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -129,7 +155,7 @@ void three_pipelines_with_multiple_records() {
void three_pipelines_with_all_unrouted_records() {
setUp(THREE_PIPELINES_WITH_UNROUTED_CONFIGURATION_UNDER_TEST);
final int numRecords = 2;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -144,8 +170,8 @@ void three_pipelines_with_all_unrouted_records() {
@Test
void three_pipelines_with_route_and_multiple_records() {
setUp(THREE_PIPELINES_WITH_ROUTE_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);
final int numRecords = 10;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, true));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -161,7 +187,7 @@ void three_pipelines_with_default_route_and_multiple_records() {
setUp(THREE_PIPELINES_WITH_DEFAULT_ROUTE_CONFIGURATION_UNDER_TEST);
final int numRecords = 10;

inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, true));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -176,7 +202,7 @@ void three_pipelines_with_default_route_and_multiple_records() {
void two_parallel_pipelines_multiple_records() {
setUp(TWO_PARALLEL_PIPELINES_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -191,7 +217,7 @@ void two_parallel_pipelines_multiple_records() {
void three_pipelines_multi_sink_multiple_records() {
setUp(THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -206,7 +232,7 @@ void three_pipelines_multi_sink_multiple_records() {
void one_pipeline_three_sinks_multiple_records() {
setUp(ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -221,7 +247,7 @@ void one_pipeline_three_sinks_multiple_records() {
void one_pipeline_ack_expiry_multiple_records() {
setUp(ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false));

await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Expand All @@ -236,7 +262,7 @@ void one_pipeline_ack_expiry_multiple_records() {
void one_pipeline_three_sinks_negative_ack_multiple_records() {
setUp(ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST);
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false));
inMemorySinkAccessor.setResult(false);

await().atMost(40000, TimeUnit.MILLISECONDS)
Expand Down
Loading

0 comments on commit 2865c79

Please sign in to comment.