Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support pipeline extensions in pipeline config #3299

Merged
merged 30 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9b59ec1
ADD: initial implementation on injecting extension config
chenqi0805 May 31, 2023
43a52d7
ENH: context change
chenqi0805 Jun 2, 2023
fb9af26
TST: infra
chenqi0805 Jun 2, 2023
f906fce
Merge branch 'main' into enh/allow-config-in-extension
chenqi0805 Jun 5, 2023
a3b4c24
REF: parsing extension from pipeline config
chenqi0805 Jun 9, 2023
2049066
MAINT: remove test code
chenqi0805 Jun 9, 2023
250ae6f
MAINT: fix test cases
chenqi0805 Jun 9, 2023
e143fbb
Merge branch 'main' into enh/allow-config-in-extension
chenqi0805 Aug 21, 2023
79f20cf
MAINT: handle null value on config in the ExtensionPlugin
chenqi0805 Aug 22, 2023
50f8e0d
FIX: null validation
chenqi0805 Aug 22, 2023
bd2c0a6
REF: pipeline_extensions in data-prepper-config
chenqi0805 Aug 24, 2023
e1dec5b
MAINT: revert changes on refactoring pipeline_extensions
chenqi0805 Aug 28, 2023
51df8e3
MAINT: revert pipeline_extensions into data-prepper-config
chenqi0805 Aug 29, 2023
d545390
REF: pipeline_extensions -> extensions
chenqi0805 Aug 29, 2023
bab025d
MAINT: refactor ObjectMapper
chenqi0805 Aug 29, 2023
d7a057d
MAINT: unused imports
chenqi0805 Aug 29, 2023
1081eb3
Merge branch 'main' into enh/allow-config-in-extension
chenqi0805 Aug 30, 2023
09c2146
REF: merge dataflow model
chenqi0805 Aug 30, 2023
3a84ff3
MAINT: resolve merging conflict
chenqi0805 Aug 30, 2023
6527e07
MAINT: import fix
chenqi0805 Aug 30, 2023
b7c19f3
FIX: do not allow duplicate pipelines
chenqi0805 Aug 30, 2023
7b29f73
MAINT: revert
chenqi0805 Aug 30, 2023
0204d4d
ENH: resolve pipeline extension configurations
chenqi0805 Aug 31, 2023
0c2c6dc
Merge branch 'main' into enh/support-pipeline-extensions-in-pipeline-…
chenqi0805 Aug 31, 2023
c785db6
FIX: test cases
chenqi0805 Aug 31, 2023
9153a21
MAINT: initial change on merging pipelineExtensions
chenqi0805 Aug 31, 2023
88942aa
TST: test cases on parsing pipeline_extensions
chenqi0805 Sep 1, 2023
6955a3b
FIX: no pipeline-extensions
chenqi0805 Sep 1, 2023
1722ad9
MAINT: test coverage
chenqi0805 Sep 1, 2023
4631943
MAINT: rename pipeline_extensions to pipeline_configurations
chenqi0805 Sep 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.dataprepper.parser.model;
package org.opensearch.dataprepper.model.configuration;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -28,6 +30,11 @@ public class PipelinesDataFlowModel {
@JsonInclude(JsonInclude.Include.NON_NULL)
private DataPrepperVersion version;

@JsonProperty("pipeline_extensions")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #2590 this is proposed as pipeline_configurations. I tend to think that we should keep this original name.

There are a couple reasons:

  1. Extensions are intended for extending Data Prepper. Thus, extensions may need to run before parsing a pipeline configuration.
  2. The available options in pipeline_configurations should be somewhat different from the extensions. Not everything in extensions should be configurable in the pipeline.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename this part

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSetter(nulls = Nulls.SKIP)
private PipelineExtensions pipelineExtensions;

@JsonAnySetter
private Map<String, PipelineModel> pipelines = new HashMap<>();

Expand All @@ -42,6 +49,12 @@ public PipelinesDataFlowModel(final Map<String, PipelineModel> pipelines) {
this.pipelines = pipelines;
}

public PipelinesDataFlowModel(final PipelineExtensions pipelineExtensions,
final Map<String, PipelineModel> pipelines) {
this.pipelineExtensions = pipelineExtensions;
this.pipelines = pipelines;
}

public PipelinesDataFlowModel(final DataPrepperVersion version, final Map<String, PipelineModel> pipelines) {
this.version = version;
this.pipelines = pipelines;
Expand All @@ -62,6 +75,10 @@ public DataPrepperVersion getDataPrepperVersion() {
return version;
}

public PipelineExtensions getPipelineExtensions() {
return pipelineExtensions;
}

@JsonSetter
public void setVersion(final String version) {
this.version = DataPrepperVersion.parse(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.jupiter.api.Assertions.assertAll;
Expand All @@ -34,6 +35,7 @@ class PipelinesDataFlowModelTest {
private static final String RESOURCE_PATH_WITH_ROUTES = "/pipelines_data_flow_routes.yaml";
private static final String RESOURCE_PATH_WITH_SHORT_HAND_VERSION = "/pipeline_with_short_hand_version.yaml";
private static final String RESOURCE_PATH_WITH_VERSION = "/pipeline_with_version.yaml";
private static final String RESOURCE_PATH_WITH_EXTENSION = "/pipeline_with_extension.yaml";
private ObjectMapper objectMapper;

@BeforeEach
Expand Down Expand Up @@ -87,6 +89,28 @@ void testSerializing_PipelinesDataFlowModel_with_Version() throws JsonProcessing
assertThat(serializedString, equalTo(expectedYaml));
}

@Test
void testSerializing_PipelinesDataFlowModel_skip_null_pipelineExtensions() throws JsonProcessingException {
String pipelineName = "test-pipeline";

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(
(PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel));

final String serializedString = objectMapper.writeValueAsString(pipelinesDataFlowModel);

InputStream inputStream = this.getClass().getResourceAsStream(RESOURCE_PATH);

final String expectedYaml = PluginModelTests.convertInputStreamToString(inputStream);

assertThat(serializedString, notNullValue());
assertThat(serializedString, equalTo(expectedYaml));
}

@Test
void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_and_workers_and_route() throws JsonProcessingException {
String pipelineName = "test-pipeline";
Expand Down Expand Up @@ -118,6 +142,7 @@ void deserialize_PipelinesDataFlowModel() throws IOException {
final String pipelineName = "test-pipeline";

assertThat(actualModel, notNullValue());
assertThat(actualModel.getPipelineExtensions(), nullValue());
assertThat(actualModel.getPipelines(), notNullValue());
assertThat(actualModel.getPipelines().size(), equalTo(1));
assertThat(actualModel.getPipelines(), hasKey(pipelineName));
Expand Down Expand Up @@ -247,4 +272,15 @@ void deserialize_PipelinesDataFlowModel_with_shorthand_version() throws IOExcept
assertThat(version.getMajorVersion(), is(equalTo(2)));
assertThat(version.getMinorVersion(), is(equalTo(Optional.empty())));
}

@Test
void deserialize_PipelinesDataFlowModel_with_extension() throws IOException {
final InputStream inputStream = this.getClass().getResourceAsStream(RESOURCE_PATH_WITH_EXTENSION);

final PipelinesDataFlowModel actualModel = objectMapper.readValue(inputStream, PipelinesDataFlowModel.class);

assertThat(actualModel, notNullValue());
assertThat(actualModel.getPipelineExtensions(), notNullValue());
assertThat(actualModel.getPipelineExtensions().getExtensionMap().containsKey("test_extension"), is(true));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
pipeline_extensions:
test_extension:
test-pipeline:
source:
testSource: null
processor:
- testProcessor: null
sink:
- testSink: null
workers: 8
delay: 50
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.slf4j.Logger;
Expand Down Expand Up @@ -102,6 +103,16 @@ private PipelinesDataFlowModel mergePipelinesDataModels(
Map.Entry::getKey,
Map.Entry::getValue
));
return new PipelinesDataFlowModel(pipelinesDataFlowModelMap);
final List<PipelineExtensions> pipelineExtensionsList = pipelinesDataFlowModels.stream()
.map(PipelinesDataFlowModel::getPipelineExtensions)
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (pipelineExtensionsList.size() > 1 ||
(pipelineExtensionsList.size() == 1 && pipelinesDataFlowModels.size() > 1)) {
throw new ParseException(
"Pipeline extensions and configurations must all be defined in a single YAML file if pipeline_extensions is configured.");
}
return pipelineExtensionsList.isEmpty() ? new PipelinesDataFlowModel(pipelinesDataFlowModelMap) :
new PipelinesDataFlowModel(pipelineExtensionsList.get(0), pipelinesDataFlowModelMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.parser.config.MetricTagFilter;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,27 @@
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validator;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;

import javax.inject.Inject;
import javax.inject.Named;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

@Named
public class ExtensionPluginConfigurationConverter {
private final DataPrepperConfiguration dataPrepperConfiguration;
private final ExtensionPluginConfigurationResolver extensionPluginConfigurationResolver;
private final ObjectMapper objectMapper;
private final Validator validator;

@Inject
public ExtensionPluginConfigurationConverter(final DataPrepperConfiguration dataPrepperConfiguration,
final Validator validator,
@Named("pluginConfigObjectMapper")
final ObjectMapper objectMapper) {
this.dataPrepperConfiguration = dataPrepperConfiguration;
public ExtensionPluginConfigurationConverter(
final ExtensionPluginConfigurationResolver extensionPluginConfigurationResolver,
final Validator validator,
@Named("pluginConfigObjectMapper")
final ObjectMapper objectMapper) {
this.extensionPluginConfigurationResolver = extensionPluginConfigurationResolver;
this.objectMapper = objectMapper;
this.validator = validator;
}
Expand All @@ -35,11 +33,8 @@ public Object convert(final Class<?> extensionPluginConfigurationType, final Str
Objects.requireNonNull(extensionPluginConfigurationType);
Objects.requireNonNull(rootKey);

final Map<String, Object> extensionProperties = dataPrepperConfiguration.getPipelineExtensions() == null?
new HashMap<>() : dataPrepperConfiguration.getPipelineExtensions().getExtensionMap();

final Object configuration = convertSettings(extensionPluginConfigurationType,
extensionProperties.get(rootKey));
extensionPluginConfigurationResolver.getExtensionMap().get(rootKey));

final Set<ConstraintViolation<Object>> constraintViolations = configuration == null ? Collections.emptySet() :
validator.validate(configuration);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;

import javax.inject.Inject;
import javax.inject.Named;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@Named
public class ExtensionPluginConfigurationResolver {
private final Map<String, Object> extensionMap;

@Inject
public ExtensionPluginConfigurationResolver(final DataPrepperConfiguration dataPrepperConfiguration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted in another comment, we should not tie extensions configurations directly with pipeline configurations. Can we decouple these somewhat?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind if I do it in the other PR: #3340. There has been some tweaks on the interface

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable.

final PipelinesDataFlowModel pipelinesDataFlowModel) {
extensionMap = dataPrepperConfiguration.getPipelineExtensions() == null?
new HashMap<>() : new HashMap<>(dataPrepperConfiguration.getPipelineExtensions().getExtensionMap());
if (pipelinesDataFlowModel.getPipelineExtensions() != null) {
extensionMap.putAll(pipelinesDataFlowModel.getPipelineExtensions().getExtensionMap());
}
}

public Map<String, Object> getExtensionMap() {
return Collections.unmodifiableMap(extensionMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class TestDataProvider {
public static final Integer DEFAULT_READ_BATCH_DELAY = 3_000;
public static final Integer TEST_DELAY = 3_000;
public static final String VALID_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/valid_multiple_pipeline_configuration.yml";
public static final String VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSIONS = "src/test/resources/valid_pipeline_configuration_with_extensions.yml";
public static final String VALID_SINGLE_PIPELINE_EMPTY_SOURCE_PLUGIN_FILE = "src/test/resources/single_pipeline_valid_empty_source_plugin_settings.yml";
public static final String CONNECTED_PIPELINE_ROOT_SOURCE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_root_source.yml";
public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_child_pipeline.yml";
Expand All @@ -44,6 +45,8 @@ public class TestDataProvider {
public static final String MISSING_NAME_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_name_multiple_pipeline_configuration.yml";
public static final String MISSING_PIPELINE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_pipeline_multiple_pipeline_configuration.yml";
public static final String MULTI_FILE_PIPELINE_DIRECTOTRY = "src/test/resources/multi-pipelines";
public static final String MULTI_FILE_PIPELINE_WITH_DISTRIBUTED_PIPELINE_EXTENSIONS_DIRECTOTRY = "src/test/resources/multi-pipelines-distributed-pipeline-extensions";
public static final String MULTI_FILE_PIPELINE_WITH_SINGLE_PIPELINE_EXTENSIONS_DIRECTOTRY = "src/test/resources/multi-pipelines-single-pipeline-extensions";
public static final String SINGLE_FILE_PIPELINE_DIRECTOTRY = "src/test/resources/single-pipeline";
public static final String EMPTY_PIPELINE_DIRECTOTRY = "src/test/resources/no-pipelines";
public static final String VALID_MULTIPLE_SINKS_CONFIG_FILE = "src/test/resources/valid_multiple_sinks.yml";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
class PipelineTransformerTests {
private PeerForwarderProvider peerForwarderProvider;

@Mock
private PipelinesDataFlowModel pipelinesDataFlowModel;
@Mock
private RouterFactory routerFactory;

Expand Down Expand Up @@ -99,6 +101,7 @@ void setUp() {

coreContext.scan(DefaultPluginFactory.class.getPackage().getName());
coreContext.registerBean(DataPrepperConfiguration.class, () -> dataPrepperConfiguration);
coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel);
coreContext.refresh();
pluginFactory = coreContext.getBean(DefaultPluginFactory.class);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package org.opensearch.dataprepper.parser;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.TestDataProvider;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;

import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -20,6 +25,19 @@ void parseConfiguration_with_multiple_valid_pipelines() {
equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES));
}

@Test
void parseConfiguration_with_valid_pipelines_and_extensions() {
final PipelinesDataflowModelParser pipelinesDataflowModelParser =
new PipelinesDataflowModelParser(TestDataProvider.VALID_PIPELINE_CONFIG_FILE_WITH_EXTENSIONS);
final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration();
assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(),
equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES));
assertThat(actualPipelinesDataFlowModel.getPipelineExtensions(), notNullValue());
assertThat(actualPipelinesDataFlowModel.getPipelineExtensions().getExtensionMap(), equalTo(
Map.of("test_extension", Map.of("test_attribute", "test_string_1"))
));
}

@Test
void parseConfiguration_with_incompatible_version_should_throw() {
final PipelinesDataflowModelParser pipelinesDataflowModelParser =
Expand Down Expand Up @@ -50,6 +68,22 @@ void parseConfiguration_from_directory_with_multiple_files_creates_the_correct_m
assertThat(actualPipelinesDataFlowModel.getDataPrepperVersion(), nullValue());
}

@ParameterizedTest
@ValueSource(strings = {
TestDataProvider.MULTI_FILE_PIPELINE_WITH_DISTRIBUTED_PIPELINE_EXTENSIONS_DIRECTOTRY,
TestDataProvider.MULTI_FILE_PIPELINE_WITH_SINGLE_PIPELINE_EXTENSIONS_DIRECTOTRY
})
void parseConfiguration_from_directory_with_multiple_files_and_pipeline_extensions_should_throw() {
final PipelinesDataflowModelParser pipelinesDataflowModelParser =
new PipelinesDataflowModelParser(
TestDataProvider.MULTI_FILE_PIPELINE_WITH_DISTRIBUTED_PIPELINE_EXTENSIONS_DIRECTOTRY);
final ParseException actualException = assertThrows(
ParseException.class, pipelinesDataflowModelParser::parseConfiguration);
assertThat(actualException.getMessage(), equalTo(
"Pipeline extensions and configurations must all be defined in a single YAML file " +
"if pipeline_extensions is configured."));
}

@Test
void parseConfiguration_from_directory_with_single_file_creates_the_correct_model() {
final PipelinesDataflowModelParser pipelinesDataflowModelParser =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
Expand All @@ -34,6 +35,8 @@
*/
@ExtendWith(MockitoExtension.class)
class DefaultPluginFactoryIT {
@Mock
private PipelinesDataFlowModel pipelinesDataFlowModel;
@Mock
private DataPrepperConfiguration dataPrepperConfiguration;
private String pluginName;
Expand Down Expand Up @@ -61,6 +64,7 @@ private DefaultPluginFactory createObjectUnderTest() {
coreContext.scan(DefaultPluginFactory.class.getPackage().getName());
coreContext.register(PluginBeanFactoryProvider.class);
coreContext.registerBean(DataPrepperConfiguration.class, () -> dataPrepperConfiguration);
coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel);
coreContext.refresh();

return coreContext.getBean(DefaultPluginFactory.class);
Expand Down
Loading
Loading