diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineExtensions.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineExtensions.java similarity index 91% rename from data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineExtensions.java rename to data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineExtensions.java index 30212d5203..68e5365a3e 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/PipelineExtensions.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineExtensions.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.parser.model; +package org.opensearch.dataprepper.model.configuration; import com.fasterxml.jackson.annotation.JsonAnyGetter; import com.fasterxml.jackson.annotation.JsonAnySetter; diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java index ca75987998..aa2e9c9c24 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java @@ -11,7 +11,9 @@ import com.fasterxml.jackson.annotation.JsonGetter; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.annotation.Nulls; import java.util.HashMap; import java.util.Map; @@ -28,6 +30,11 @@ public class PipelinesDataFlowModel { @JsonInclude(JsonInclude.Include.NON_NULL) private DataPrepperVersion version; + @JsonProperty("pipeline_configurations") + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonSetter(nulls = Nulls.SKIP) + private PipelineExtensions pipelineExtensions; + @JsonAnySetter private Map pipelines = new HashMap<>(); @@ -42,6 +49,12 @@ public PipelinesDataFlowModel(final Map pipelines) { this.pipelines = pipelines; } + public PipelinesDataFlowModel(final PipelineExtensions pipelineExtensions, + final Map pipelines) { + this.pipelineExtensions = pipelineExtensions; + this.pipelines = pipelines; + } + public PipelinesDataFlowModel(final DataPrepperVersion version, final Map pipelines) { this.version = version; this.pipelines = pipelines; @@ -62,6 +75,10 @@ public DataPrepperVersion getDataPrepperVersion() { return version; } + public PipelineExtensions getPipelineExtensions() { + return pipelineExtensions; + } + @JsonSetter public void setVersion(final String version) { this.version = DataPrepperVersion.parse(version); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java index a177158201..cbec310354 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModelTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasKey; import static org.junit.jupiter.api.Assertions.assertAll; @@ -34,6 +35,7 @@ class PipelinesDataFlowModelTest { private static final String RESOURCE_PATH_WITH_ROUTES = "/pipelines_data_flow_routes.yaml"; private static final String RESOURCE_PATH_WITH_SHORT_HAND_VERSION = "/pipeline_with_short_hand_version.yaml"; private static final String RESOURCE_PATH_WITH_VERSION = "/pipeline_with_version.yaml"; + private static final String RESOURCE_PATH_WITH_EXTENSION = "/pipeline_with_extension.yaml"; private ObjectMapper objectMapper; @BeforeEach @@ -87,6 +89,28 @@ void testSerializing_PipelinesDataFlowModel_with_Version() throws JsonProcessing assertThat(serializedString, equalTo(expectedYaml)); } + @Test + void testSerializing_PipelinesDataFlowModel_skip_null_pipelineExtensions() throws JsonProcessingException { + String pipelineName = "test-pipeline"; + + final PluginModel source = new PluginModel("testSource", (Map) null); + final List processors = Collections.singletonList(new PluginModel("testProcessor", (Map) null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); + final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50); + + final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel( + (PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel)); + + final String serializedString = objectMapper.writeValueAsString(pipelinesDataFlowModel); + + InputStream inputStream = this.getClass().getResourceAsStream(RESOURCE_PATH); + + final String expectedYaml = PluginModelTests.convertInputStreamToString(inputStream); + + assertThat(serializedString, notNullValue()); + assertThat(serializedString, equalTo(expectedYaml)); + } + @Test void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_and_workers_and_route() throws JsonProcessingException { String pipelineName = "test-pipeline"; @@ -118,6 +142,7 @@ void deserialize_PipelinesDataFlowModel() throws IOException { final String pipelineName = "test-pipeline"; assertThat(actualModel, notNullValue()); + assertThat(actualModel.getPipelineExtensions(), nullValue()); assertThat(actualModel.getPipelines(), notNullValue()); assertThat(actualModel.getPipelines().size(), equalTo(1)); assertThat(actualModel.getPipelines(), hasKey(pipelineName)); @@ -247,4 +272,15 @@ void deserialize_PipelinesDataFlowModel_with_shorthand_version() throws IOExcept assertThat(version.getMajorVersion(), is(equalTo(2))); assertThat(version.getMinorVersion(), is(equalTo(Optional.empty()))); } + + @Test + void deserialize_PipelinesDataFlowModel_with_extension() throws IOException { + final InputStream inputStream = this.getClass().getResourceAsStream(RESOURCE_PATH_WITH_EXTENSION); + + final PipelinesDataFlowModel actualModel = objectMapper.readValue(inputStream, PipelinesDataFlowModel.class); + + assertThat(actualModel, notNullValue()); + assertThat(actualModel.getPipelineExtensions(), notNullValue()); + assertThat(actualModel.getPipelineExtensions().getExtensionMap().containsKey("test_extension"), is(true)); + } } diff --git a/data-prepper-api/src/test/resources/pipeline_with_extension.yaml b/data-prepper-api/src/test/resources/pipeline_with_extension.yaml new file mode 100644 index 0000000000..a622093262 --- /dev/null +++ b/data-prepper-api/src/test/resources/pipeline_with_extension.yaml @@ -0,0 +1,11 @@ +pipeline_configurations: + test_extension: +test-pipeline: + source: + testSource: null + processor: + - testProcessor: null + sink: + - testSink: null + workers: 8 + delay: 50 \ No newline at end of file diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java index c451d46c49..aaf1aa924c 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.configuration.PipelineExtensions; import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.slf4j.Logger; @@ -102,6 +103,16 @@ private PipelinesDataFlowModel mergePipelinesDataModels( Map.Entry::getKey, Map.Entry::getValue )); - return new PipelinesDataFlowModel(pipelinesDataFlowModelMap); + final List pipelineExtensionsList = pipelinesDataFlowModels.stream() + .map(PipelinesDataFlowModel::getPipelineExtensions) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (pipelineExtensionsList.size() > 1 || + (pipelineExtensionsList.size() == 1 && pipelinesDataFlowModels.size() > 1)) { + throw new ParseException( + "pipeline_configurations and definition must all be defined in a single YAML file if pipeline_configurations is configured."); + } + return pipelineExtensionsList.isEmpty() ? new PipelinesDataFlowModel(pipelinesDataFlowModelMap) : + new PipelinesDataFlowModel(pipelineExtensionsList.get(0), pipelinesDataFlowModelMap); } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java index adccb80120..9ddaf10eeb 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/model/DataPrepperConfiguration.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSetter; import com.fasterxml.jackson.annotation.Nulls; +import org.opensearch.dataprepper.model.configuration.PipelineExtensions; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.parser.config.MetricTagFilter; import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration; diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverter.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverter.java index ecf5633c2e..e5cbb46134 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverter.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverter.java @@ -4,29 +4,27 @@ import jakarta.validation.ConstraintViolation; import jakarta.validation.Validator; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; -import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import javax.inject.Inject; import javax.inject.Named; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @Named public class ExtensionPluginConfigurationConverter { - private final DataPrepperConfiguration dataPrepperConfiguration; + private final ExtensionPluginConfigurationResolver extensionPluginConfigurationResolver; private final ObjectMapper objectMapper; private final Validator validator; @Inject - public ExtensionPluginConfigurationConverter(final DataPrepperConfiguration dataPrepperConfiguration, - final Validator validator, - @Named("pluginConfigObjectMapper") - final ObjectMapper objectMapper) { - this.dataPrepperConfiguration = dataPrepperConfiguration; + public ExtensionPluginConfigurationConverter( + final ExtensionPluginConfigurationResolver extensionPluginConfigurationResolver, + final Validator validator, + @Named("pluginConfigObjectMapper") + final ObjectMapper objectMapper) { + this.extensionPluginConfigurationResolver = extensionPluginConfigurationResolver; this.objectMapper = objectMapper; this.validator = validator; } @@ -35,11 +33,8 @@ public Object convert(final Class extensionPluginConfigurationType, final Str Objects.requireNonNull(extensionPluginConfigurationType); Objects.requireNonNull(rootKey); - final Map extensionProperties = dataPrepperConfiguration.getPipelineExtensions() == null? - new HashMap<>() : dataPrepperConfiguration.getPipelineExtensions().getExtensionMap(); - final Object configuration = convertSettings(extensionPluginConfigurationType, - extensionProperties.get(rootKey)); + extensionPluginConfigurationResolver.getExtensionMap().get(rootKey)); final Set> constraintViolations = configuration == null ? Collections.emptySet() : validator.validate(configuration); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationResolver.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationResolver.java new file mode 100644 index 0000000000..c7688d5e4e --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationResolver.java @@ -0,0 +1,29 @@ +package org.opensearch.dataprepper.plugin; + +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; + +import javax.inject.Inject; +import javax.inject.Named; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@Named +public class ExtensionPluginConfigurationResolver { + private final Map extensionMap; + + @Inject + public ExtensionPluginConfigurationResolver(final DataPrepperConfiguration dataPrepperConfiguration, + final PipelinesDataFlowModel pipelinesDataFlowModel) { + extensionMap = dataPrepperConfiguration.getPipelineExtensions() == null? + new HashMap<>() : new HashMap<>(dataPrepperConfiguration.getPipelineExtensions().getExtensionMap()); + if (pipelinesDataFlowModel.getPipelineExtensions() != null) { + extensionMap.putAll(pipelinesDataFlowModel.getPipelineExtensions().getExtensionMap()); + } + } + + public Map getExtensionMap() { + return Collections.unmodifiableMap(extensionMap); + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java index c638463a10..958a3331b3 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java @@ -36,6 +36,7 @@ public class TestDataProvider { public static final Integer DEFAULT_READ_BATCH_DELAY = 3_000; public static final Integer TEST_DELAY = 3_000; public static final String VALID_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/valid_multiple_pipeline_configuration.yml"; + public static final String VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSIONS = "src/test/resources/valid_pipeline_configuration_with_extensions.yml"; public static final String VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE = "src/test/resources/single_pipeline_valid_empty_source_plugin_settings.yml"; public static final String CONNECTED_PIPELINE_ROOT_SOURCE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_root_source.yml"; public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_child_pipeline.yml"; @@ -44,6 +45,8 @@ public class TestDataProvider { public static final String MISSING_NAME_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_name_multiple_pipeline_configuration.yml"; public static final String MISSING_PIPELINE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_pipeline_multiple_pipeline_configuration.yml"; public static final String MULTI_FILE_PIPELINE_DIRECTOTRY = "src/test/resources/multi-pipelines"; + public static final String MULTI_FILE_PIPELINE_WITH_DISTRIBUTED_PIPELINE_CONFIGURATIONS_DIRECTOTRY = "src/test/resources/multi-pipelines-distributed-pipeline-configurations"; + public static final String MULTI_FILE_PIPELINE_WITH_SINGLE_PIPELINE_CONFIGURATIONS_DIRECTOTRY = "src/test/resources/multi-pipelines-single-pipeline-configurations"; public static final String SINGLE_FILE_PIPELINE_DIRECTOTRY = "src/test/resources/single-pipeline"; public static final String EMPTY_PIPELINE_DIRECTOTRY = "src/test/resources/no-pipelines"; public static final String VALID_MULTIPLE_SINKS_CONFIG_FILE = "src/test/resources/valid_multiple_sinks.yml"; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java index 3e117f8204..ce44b298fd 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java @@ -61,6 +61,8 @@ class PipelineTransformerTests { private PeerForwarderProvider peerForwarderProvider; + @Mock + private PipelinesDataFlowModel pipelinesDataFlowModel; @Mock private RouterFactory routerFactory; @@ -99,6 +101,7 @@ void setUp() { coreContext.scan(DefaultPluginFactory.class.getPackage().getName()); coreContext.registerBean(DataPrepperConfiguration.class, () -> dataPrepperConfiguration); + coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel); coreContext.refresh(); pluginFactory = coreContext.getBean(DefaultPluginFactory.class); } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java index a5a13c0753..bab8b705bb 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java @@ -1,11 +1,16 @@ package org.opensearch.dataprepper.parser; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.dataprepper.TestDataProvider; import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import java.util.Map; + import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -20,6 +25,19 @@ void parseConfiguration_with_multiple_valid_pipelines() { equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); } + @Test + void parseConfiguration_with_valid_pipelines_and_extensions() { + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(TestDataProvider.VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSIONS); + final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), + equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); + assertThat(actualPipelinesDataFlowModel.getPipelineExtensions(), notNullValue()); + assertThat(actualPipelinesDataFlowModel.getPipelineExtensions().getExtensionMap(), equalTo( + Map.of("test_extension", Map.of("test_attribute", "test_string_1")) + )); + } + @Test void parseConfiguration_with_incompatible_version_should_throw() { final PipelinesDataflowModelParser pipelinesDataflowModelParser = @@ -50,6 +68,22 @@ void parseConfiguration_from_directory_with_multiple_files_creates_the_correct_m assertThat(actualPipelinesDataFlowModel.getDataPrepperVersion(), nullValue()); } + @ParameterizedTest + @ValueSource(strings = { + TestDataProvider.MULTI_FILE_PIPELINE_WITH_DISTRIBUTED_PIPELINE_CONFIGURATIONS_DIRECTOTRY, + TestDataProvider.MULTI_FILE_PIPELINE_WITH_SINGLE_PIPELINE_CONFIGURATIONS_DIRECTOTRY + }) + void parseConfiguration_from_directory_with_multiple_files_and_pipeline_extensions_should_throw() { + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser( + TestDataProvider.MULTI_FILE_PIPELINE_WITH_DISTRIBUTED_PIPELINE_CONFIGURATIONS_DIRECTOTRY); + final ParseException actualException = assertThrows( + ParseException.class, pipelinesDataflowModelParser::parseConfiguration); + assertThat(actualException.getMessage(), equalTo( + "pipeline_configurations and definition must all be defined in a single YAML file " + + "if pipeline_configurations is configured.")); + } + @Test void parseConfiguration_from_directory_with_single_file_creates_the_correct_model() { final PipelinesDataflowModelParser pipelinesDataflowModelParser = diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java index dea4e45fa0..ca43a31e48 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java @@ -8,6 +8,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; @@ -34,6 +35,8 @@ */ @ExtendWith(MockitoExtension.class) class DefaultPluginFactoryIT { + @Mock + private PipelinesDataFlowModel pipelinesDataFlowModel; @Mock private DataPrepperConfiguration dataPrepperConfiguration; private String pluginName; @@ -61,6 +64,7 @@ private DefaultPluginFactory createObjectUnderTest() { coreContext.scan(DefaultPluginFactory.class.getPackage().getName()); coreContext.register(PluginBeanFactoryProvider.class); coreContext.registerBean(DataPrepperConfiguration.class, () -> dataPrepperConfiguration); + coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel); coreContext.refresh(); return coreContext.getBean(DefaultPluginFactory.class); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverterTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverterTest.java index 3b49a8daf6..6d28bf04d7 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverterTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationConverterTest.java @@ -9,9 +9,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.parser.model.PipelineExtensions; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; -import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; import org.opensearch.dataprepper.plugins.test.TestExtension; import org.opensearch.dataprepper.plugins.test.TestExtensionConfig; @@ -35,9 +33,7 @@ class ExtensionPluginConfigurationConverterTest { @Mock private Validator validator; @Mock - private DataPrepperConfiguration dataPrepperConfiguration; - @Mock - private PipelineExtensions pipelineExtensions; + private ExtensionPluginConfigurationResolver extensionPluginConfigurationResolver; @Mock private ConstraintViolation constraintViolation; @@ -47,7 +43,7 @@ class ExtensionPluginConfigurationConverterTest { @BeforeEach void setUp() { objectUnderTest = new ExtensionPluginConfigurationConverter( - dataPrepperConfiguration, validator, objectMapper); + extensionPluginConfigurationResolver, validator, objectMapper); } @Test @@ -65,10 +61,9 @@ void convert_with_null_rootKey_should_throw() { @Test void convert_with_test_extension_with_config() { when(validator.validate(any())).thenReturn(Collections.emptySet()); - when(dataPrepperConfiguration.getPipelineExtensions()).thenReturn(pipelineExtensions); final String rootKey = "test_extension"; final String testValue = "test_value"; - when(pipelineExtensions.getExtensionMap()).thenReturn(Map.of( + when(extensionPluginConfigurationResolver.getExtensionMap()).thenReturn(Map.of( rootKey, Map.of("test_attribute", testValue) )); final Object testExtensionConfig = objectUnderTest.convert(TestExtensionConfig.class, rootKey); @@ -78,18 +73,17 @@ void convert_with_test_extension_with_config() { @Test void convert_with_null_rootKey_value_should_return_null() { - when(dataPrepperConfiguration.getPipelineExtensions()).thenReturn(pipelineExtensions); final String rootKey = "test_extension"; - when(pipelineExtensions.getExtensionMap()).thenReturn(Collections.emptyMap()); + when(extensionPluginConfigurationResolver.getExtensionMap()).thenReturn(Collections.emptyMap()); final Object testExtensionConfig = objectUnderTest.convert(TestExtensionConfig.class, rootKey); assertThat(testExtensionConfig, nullValue()); } @Test void convert_should_throw_exception_when_there_are_constraint_violations() { - when(dataPrepperConfiguration.getPipelineExtensions()).thenReturn(pipelineExtensions); final String rootKey = UUID.randomUUID().toString(); - when(pipelineExtensions.getExtensionMap()).thenReturn(Map.of(rootKey, Collections.emptyMap())); + when(extensionPluginConfigurationResolver.getExtensionMap()).thenReturn( + Map.of(rootKey, Collections.emptyMap())); final String errorMessage = UUID.randomUUID().toString(); given(constraintViolation.getMessage()).willReturn(errorMessage); final String propertyPathString = UUID.randomUUID().toString(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationResolverTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationResolverTest.java new file mode 100644 index 0000000000..f0e77112e7 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ExtensionPluginConfigurationResolverTest.java @@ -0,0 +1,70 @@ +package org.opensearch.dataprepper.plugin; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.configuration.PipelineExtensions; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; + +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ExtensionPluginConfigurationResolverTest { + @Mock + private DataPrepperConfiguration dataPrepperConfiguration; + @Mock + private PipelinesDataFlowModel pipelinesDataFlowModel; + @Mock + private PipelineExtensions pipelineExtensions; + + private ExtensionPluginConfigurationResolver objectUnderTest; + + @Test + void testGetExtensionMap_defined_in_dataPrepperConfiguration_only() { + when(dataPrepperConfiguration.getPipelineExtensions()).thenReturn(pipelineExtensions); + final Map extensionMap = Map.of("test_extension", Map.of("test_key", "test_value")); + when(pipelineExtensions.getExtensionMap()).thenReturn(extensionMap); + when(pipelinesDataFlowModel.getPipelineExtensions()).thenReturn(null); + objectUnderTest = new ExtensionPluginConfigurationResolver(dataPrepperConfiguration, pipelinesDataFlowModel); + assertThat(objectUnderTest.getExtensionMap(), equalTo(extensionMap)); + } + + @Test + void testGetExtensionMap_defined_in_pipelinesDataFlowModel_only() { + when(dataPrepperConfiguration.getPipelineExtensions()).thenReturn(null); + when(pipelinesDataFlowModel.getPipelineExtensions()).thenReturn(pipelineExtensions); + final Map extensionMap = Map.of("test_extension", Map.of("test_key", "test_value")); + when(pipelineExtensions.getExtensionMap()).thenReturn(extensionMap); + objectUnderTest = new ExtensionPluginConfigurationResolver(dataPrepperConfiguration, pipelinesDataFlowModel); + assertThat(objectUnderTest.getExtensionMap(), equalTo(extensionMap)); + } + + @Test + void testGetExtensionMap_defined_in_both() { + when(dataPrepperConfiguration.getPipelineExtensions()).thenReturn(pipelineExtensions); + final Map dataPrepperConfigurationExtensionMap = Map.of( + "test_extension1", Map.of("test_key1", "test_value1"), + "test_extension2", Map.of("test_key1", "test_value1") + ); + when(pipelineExtensions.getExtensionMap()).thenReturn(dataPrepperConfigurationExtensionMap); + final PipelineExtensions pipelineExtensions1 = mock(PipelineExtensions.class); + when(pipelinesDataFlowModel.getPipelineExtensions()).thenReturn(pipelineExtensions1); + final Map pipelinesDataFlowModelExtensionMap = Map.of( + "test_extension1", Map.of("test_key2", "test_value2") + ); + when(pipelineExtensions1.getExtensionMap()).thenReturn(pipelinesDataFlowModelExtensionMap); + objectUnderTest = new ExtensionPluginConfigurationResolver(dataPrepperConfiguration, pipelinesDataFlowModel); + final Map expectedExtensionMap = Map.of( + "test_extension1", Map.of("test_key2", "test_value2"), + "test_extension2", Map.of("test_key1", "test_value1") + ); + assertThat(objectUnderTest.getExtensionMap(), equalTo(expectedExtensionMap)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/resources/multi-pipelines-distributed-pipeline-configurations/pipeline_configuration_with_test_extension1.yml b/data-prepper-core/src/test/resources/multi-pipelines-distributed-pipeline-configurations/pipeline_configuration_with_test_extension1.yml new file mode 100644 index 0000000000..8196622cb9 --- /dev/null +++ b/data-prepper-core/src/test/resources/multi-pipelines-distributed-pipeline-configurations/pipeline_configuration_with_test_extension1.yml @@ -0,0 +1,13 @@ +# this configuration file is solely for testing formatting +pipeline_configurations: + test_extension_1: + test_attribute: test_string +test-pipeline-1: + source: + file: + path: "/tmp/file-source.tmp" + buffer: + bounded_blocking: #to check non object nodes for plugins + sink: + - pipeline: + name: "test-pipeline-2" diff --git a/data-prepper-core/src/test/resources/multi-pipelines-distributed-pipeline-configurations/pipeline_configuration_with_test_extension2.yml b/data-prepper-core/src/test/resources/multi-pipelines-distributed-pipeline-configurations/pipeline_configuration_with_test_extension2.yml new file mode 100644 index 0000000000..1dc3e72570 --- /dev/null +++ b/data-prepper-core/src/test/resources/multi-pipelines-distributed-pipeline-configurations/pipeline_configuration_with_test_extension2.yml @@ -0,0 +1,11 @@ +# this configuration file is solely for testing formatting +pipeline_configurations: + test_extension_2: + test_attribute: test_string +test-pipeline-2: + source: + pipeline: + name: "test-pipeline-1" + sink: + - pipeline: + name: "test-pipeline-3" diff --git a/data-prepper-core/src/test/resources/multi-pipelines-single-pipeline-configurations/pipeline_configuration2.yml b/data-prepper-core/src/test/resources/multi-pipelines-single-pipeline-configurations/pipeline_configuration2.yml new file mode 100644 index 0000000000..22c0fe8c62 --- /dev/null +++ b/data-prepper-core/src/test/resources/multi-pipelines-single-pipeline-configurations/pipeline_configuration2.yml @@ -0,0 +1,8 @@ +# this configuration file is solely for testing formatting +test-pipeline-2: + source: + pipeline: + name: "test-pipeline-1" + sink: + - pipeline: + name: "test-pipeline-3" diff --git a/data-prepper-core/src/test/resources/multi-pipelines-single-pipeline-configurations/pipeline_configuration_with_test_extension.yml b/data-prepper-core/src/test/resources/multi-pipelines-single-pipeline-configurations/pipeline_configuration_with_test_extension.yml new file mode 100644 index 0000000000..4c8648a9a7 --- /dev/null +++ b/data-prepper-core/src/test/resources/multi-pipelines-single-pipeline-configurations/pipeline_configuration_with_test_extension.yml @@ -0,0 +1,13 @@ +# this configuration file is solely for testing formatting +pipeline_configurations: + test_extension: + test_attribute: test_string +test-pipeline-1: + source: + file: + path: "/tmp/file-source.tmp" + buffer: + bounded_blocking: #to check non object nodes for plugins + sink: + - pipeline: + name: "test-pipeline-2" diff --git a/data-prepper-core/src/test/resources/valid_pipeline_configuration_with_extensions.yml b/data-prepper-core/src/test/resources/valid_pipeline_configuration_with_extensions.yml new file mode 100644 index 0000000000..144bb262ec --- /dev/null +++ b/data-prepper-core/src/test/resources/valid_pipeline_configuration_with_extensions.yml @@ -0,0 +1,27 @@ +# this configuration file is solely for testing formatting +pipeline_configurations: + test_extension: + test_attribute: test_string_1 +test-pipeline-1: + source: + file: + path: "/tmp/file-source.tmp" + buffer: + bounded_blocking: #to check non object nodes for plugins + sink: + - pipeline: + name: "test-pipeline-2" +test-pipeline-2: + source: + pipeline: + name: "test-pipeline-1" + sink: + - pipeline: + name: "test-pipeline-3" +test-pipeline-3: + source: + pipeline: + name: "test-pipeline-2" + sink: + - file: + path: "/tmp/todelete.txt" \ No newline at end of file