diff --git a/data-prepper-api/build.gradle b/data-prepper-api/build.gradle index d4c10dfa19..4baf4220b7 100644 --- a/data-prepper-api/build.gradle +++ b/data-prepper-api/build.gradle @@ -23,7 +23,7 @@ dependencies { jacocoTestCoverageVerification { dependsOn(jacocoTestReport) violationRules { - rule { //in addition to core projects rule - this one checks for 100% code coverage for this project + rule { limit { minimum = 1.0 } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java index 936a5445e8..a84fd1ae26 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/SinkModel.java @@ -29,7 +29,7 @@ @JsonDeserialize(using = SinkModel.SinkModelDeserializer.class) public class SinkModel extends PluginModel { - SinkModel(final String pluginName, final List routes, final String tagsTargetKey, final List includeKeys, final List excludeKeys, final Map pluginSettings) { + public SinkModel(final String pluginName, final List routes, final String tagsTargetKey, final List includeKeys, final List excludeKeys, final Map pluginSettings) { this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, pluginSettings)); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java index 09aad97ac4..0f96717c6c 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java @@ -87,6 +87,7 @@ public Map transformConfiguration() { Map.Entry::getKey, entry -> new PipelineConfiguration(entry.getValue()) )); + final List allPipelineNames = PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigurationMap); // LinkedHashMap to preserve insertion order diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java index 107302ce06..f4f586c5b6 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java @@ -6,7 +6,9 @@ package org.opensearch.dataprepper.parser.config; import org.opensearch.dataprepper.breaker.CircuitBreakerManager; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.parser.PipelineTransformer; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; @@ -14,14 +16,22 @@ import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader; import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationReader; import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser; +import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator; +import org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer; +import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineConfigurationTransformer; import org.opensearch.dataprepper.pipeline.router.RouterFactory; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; -import org.opensearch.dataprepper.model.event.EventFactory; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.springframework.context.annotation.Primary; @Configuration +@ComponentScan(basePackages = { + "org.opensearch.dataprepper.pipeline.parser", + "org.opensearch.dataprepper.plugin" +}) public class PipelineParserConfiguration { @Bean @@ -35,7 +45,7 @@ public PipelineTransformer pipelineParser( final EventFactory eventFactory, final AcknowledgementSetManager acknowledgementSetManager, final SourceCoordinatorFactory sourceCoordinatorFactory - ) { + ) { return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, @@ -60,7 +70,21 @@ public PipelinesDataflowModelParser pipelinesDataflowModelParser( } @Bean + public PipelineConfigurationTransformer pipelineConfigTransformer(RuleEvaluator ruleEvaluator) { + return new DynamicConfigTransformer(ruleEvaluator); + } + + + @Bean(name = "pipelinesDataFlowModel") + @Primary public PipelinesDataFlowModel pipelinesDataFlowModel( + PipelineConfigurationTransformer pipelineConfigTransformer, + @Qualifier("preTransformedDataFlowModel") PipelinesDataFlowModel preTransformedDataFlowModel) { + return pipelineConfigTransformer.transformConfiguration(preTransformedDataFlowModel); + } + + @Bean(name = "preTransformedDataFlowModel") + public PipelinesDataFlowModel preTransformedDataFlowModel( final PipelinesDataflowModelParser pipelinesDataflowModelParser) { return pipelinesDataflowModelParser.parseConfiguration(); } diff --git a/data-prepper-pipeline-parser/build.gradle b/data-prepper-pipeline-parser/build.gradle index e5c3d5b07e..c270ad0dcd 100644 --- a/data-prepper-pipeline-parser/build.gradle +++ b/data-prepper-pipeline-parser/build.gradle @@ -12,7 +12,28 @@ group = 'org.opensearch.dataprepper.core' dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:blocking-buffer') - implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation 'org.apache.commons:commons-collections4:4.4' + implementation 'org.projectlombok:lombok:1.18.22' + implementation 'com.jayway.jsonpath:json-path:2.6.0' + implementation 'javax.inject:javax.inject:1' + implementation(libs.spring.core) { + exclude group: 'commons-logging', module: 'commons-logging' + } + implementation(libs.spring.context) { + exclude group: 'commons-logging', module: 'commons-logging' + } + implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1' + testImplementation testLibs.bundles.junit + testImplementation testLibs.bundles.mockito + testImplementation testLibs.hamcrest + testImplementation 'org.powermock:powermock-module-junit4:2.0.9' + testImplementation 'org.powermock:powermock-api-mockito2:2.0.9' + testImplementation 'org.assertj:assertj-core:3.20.2' + testImplementation 'junit:junit:4.13.2' + testImplementation 'org.powermock:powermock-module-junit4:2.0.9' + testImplementation 'org.powermock:powermock-api-mockito2:2.0.9' + compileOnly 'org.projectlombok:lombok:1.18.20' + annotationProcessor 'org.projectlombok:lombok:1.18.20' } \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java index 0867b1e824..2b92d35923 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationFileReader.java @@ -1,5 +1,6 @@ package org.opensearch.dataprepper.pipeline.parser; +import static java.lang.String.format; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,8 +14,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.lang.String.format; - public class PipelineConfigurationFileReader implements PipelineConfigurationReader { private static final Logger LOG = LoggerFactory.getLogger(PipelineConfigurationFileReader.class); private final String pipelineConfigurationFileLocation; @@ -41,6 +40,7 @@ private List getInputStreamsForConfigurationFiles() { return inputStreams; } else if (configurationLocation.isDirectory()) { FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml")); + List inputStreams = Stream.of(configurationLocation.listFiles(yamlFilter)) .map(this::getInputStreamForFile) .filter(Objects::nonNull) diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationReader.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationReader.java index 80a4c43c7b..0794f8e440 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationReader.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineConfigurationReader.java @@ -6,9 +6,9 @@ public interface PipelineConfigurationReader { /** - * * @return a List of InputStream that contains each of the pipeline configurations. - * the caller of this method is responsible for closing these input streams after they are used + * the caller of this method is responsible for closing these input streams after they are used */ List getPipelineConfigurationInputStreams(); + } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java new file mode 100644 index 0000000000..6650c6b9ee --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser; + +import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator; +import org.opensearch.dataprepper.pipeline.parser.transformer.TransformersFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.inject.Named; + +@Configuration +public class PipelineTransformationConfiguration { + public static final String TEMPLATES_DIRECTORY_PATH = "TEMPLATES_DIRECTORY_PATH"; + public static final String RULES_DIRECTORY_PATH = "VALIDATORS_DIRECTORY_PATH"; + + @Bean + @Named(RULES_DIRECTORY_PATH) + static String provideRulesDirectoryPath() { + return "resources/rules"; + } + + @Bean + @Named(TEMPLATES_DIRECTORY_PATH) + static String provideTemplateDirectoryPath() { + return "resources/templates"; + } + + @Bean + TransformersFactory transformersFactory( + @Named(TEMPLATES_DIRECTORY_PATH) String provideTransformerDirectoryPath, + @Named(RULES_DIRECTORY_PATH) String provideTemplateDirectoryPath + ) { + return new TransformersFactory(RULES_DIRECTORY_PATH, TEMPLATES_DIRECTORY_PATH); + } + + @Bean + public RuleEvaluator ruleEvaluator(TransformersFactory transformersFactory) { + return new RuleEvaluator(transformersFactory); + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParser.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParser.java index 547b235121..52c17e4e92 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParser.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParser.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import static java.lang.String.format; import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; import org.opensearch.dataprepper.model.configuration.PipelineExtensions; import org.opensearch.dataprepper.model.configuration.PipelineModel; @@ -22,8 +23,6 @@ import java.util.Objects; import java.util.stream.Collectors; -import static java.lang.String.format; - public class PipelinesDataflowModelParser { private static final Logger LOG = LoggerFactory.getLogger(PipelinesDataflowModelParser.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()) @@ -35,9 +34,11 @@ public PipelinesDataflowModelParser(final PipelineConfigurationReader pipelineCo this.pipelineConfigurationReader = pipelineConfigurationReader; } + public PipelinesDataFlowModel parseConfiguration() { final List pipelinesDataFlowModels = parseStreamsToPipelinesDataFlowModel(); - return mergePipelinesDataModels(pipelinesDataFlowModels); + PipelinesDataFlowModel pipelinesDataFlowModel = mergePipelinesDataModels(pipelinesDataFlowModels); + return pipelinesDataFlowModel; } private void validateDataPrepperVersion(final DataPrepperVersion version) { @@ -89,4 +90,5 @@ private PipelinesDataFlowModel mergePipelinesDataModels( return pipelineExtensionsList.isEmpty() ? new PipelinesDataFlowModel(pipelinesDataFlowModelMap) : new PipelinesDataFlowModel(pipelineExtensionsList.get(0), pipelinesDataFlowModelMap); } + } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java new file mode 100644 index 0000000000..5971fe17e3 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java @@ -0,0 +1,118 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.ParseContext; +import com.jayway.jsonpath.PathNotFoundException; +import com.jayway.jsonpath.ReadContext; +import com.jayway.jsonpath.spi.json.JacksonJsonProvider; +import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; +import org.opensearch.dataprepper.model.configuration.PipelineModel; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel; +import org.opensearch.dataprepper.pipeline.parser.transformer.TransformersFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class RuleEvaluator { + + private static final Logger LOG = LoggerFactory.getLogger(RuleEvaluator.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); + private final TransformersFactory transformersFactory; + private String PLUGIN_NAME = null; + + public RuleEvaluator(TransformersFactory transformersFactory) { + this.transformersFactory = transformersFactory; + } + + public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelineModel) { + return isDocDBSource(pipelineModel); + } + + /** + * Evaluates model based on pre defined rules and + * result contains the name of the pipeline that will need transformation, + * evaluated boolean result and the corresponding template model + * Assumption: only one pipeline can have transformation. + * @param pipelinesModel + * @return RuleEvaluatorResult + */ + private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) { + PLUGIN_NAME = "documentdb"; + String pluginRulesPath = transformersFactory.getPluginRuleFileLocation(PLUGIN_NAME); + Map pipelines = pipelinesModel.getPipelines(); + + for (Map.Entry entry : pipelines.entrySet()) { + try { + String pipelineJson = OBJECT_MAPPER.writeValueAsString(entry); + if (evaluate(pipelineJson, pluginRulesPath)) { + LOG.debug("Rule path {} is evaluated true for pipelineJson {}",pluginRulesPath, pipelineJson); + + String templateFilePath = transformersFactory.getPluginTemplateFileLocation(PLUGIN_NAME); + PipelineTemplateModel templateModel = yamlMapper.readValue(new File(templateFilePath), + PipelineTemplateModel.class); + LOG.debug("Chosen template file {}",templateFilePath); + + return RuleEvaluatorResult.builder() + .withEvaluatedResult(true) + .withPipelineTemplateModel(templateModel) + .withPipelineName(entry.getKey()) + .build(); + } + } catch (JsonProcessingException e) { + LOG.error("Error processing json"); + throw new RuntimeException(e); + } catch (IOException e) { + LOG.error("Error reading file"); + throw new RuntimeException(e); + } + } + return RuleEvaluatorResult.builder() + .withEvaluatedResult(false) + .withPipelineName(null) + .withPipelineTemplateModel(null) + .build(); + } + + private Boolean evaluate(String pipelinesJson, + String rulePath) { + + Configuration parseConfig = Configuration.builder() + .jsonProvider(new JacksonJsonProvider()) + .mappingProvider(new JacksonMappingProvider()) + .options(Option.AS_PATH_LIST) + .build(); + ParseContext parseContext = JsonPath.using(parseConfig); + ReadContext readPathContext = parseContext.parse(pipelinesJson); + + try { + RuleTransformerModel rulesModel = yamlMapper.readValue(new File(rulePath), RuleTransformerModel.class); + List rules = rulesModel.getApplyWhen(); + for (String rule : rules) { + Object result = readPathContext.read(rule); + } + } catch (IOException e) { + LOG.warn("Error reading file {}", rulePath); + return false; + } catch (PathNotFoundException e) { + LOG.warn("Path not found {}", rulePath); + return false; + } + return true; + } +} + diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java new file mode 100644 index 0000000000..e89768fee6 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResult.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel; + +@Builder(setterPrefix = "with") +@Getter +@AllArgsConstructor +public class RuleEvaluatorResult { + + private boolean evaluatedResult; + + private String pipelineName; + + private PipelineTemplateModel pipelineTemplateModel; + + public RuleEvaluatorResult() { + + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java new file mode 100644 index 0000000000..0ad9e45b72 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.List; + +@Data +public class RuleTransformerModel { + + @JsonProperty("apply_when") + private List applyWhen; + + public RuleTransformerModel() { + } + + public RuleTransformerModel(List applyWhen) { + this.applyWhen = applyWhen; + } + + public List getApplyWhen() { + return applyWhen; + } + + public void setApplyWhen(List applyWhen) { + this.applyWhen = applyWhen; + } + + @Override + public String toString() { + return "RuleConfiguration{" + + "applyWhen=" + applyWhen + + '}'; + } +} \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java new file mode 100644 index 0000000000..6bb796b9a9 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java @@ -0,0 +1,408 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.transformer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.PathNotFoundException; +import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider; +import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; +import static java.lang.String.format; +import org.opensearch.dataprepper.model.configuration.PipelineModel; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator; +import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluatorResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.transform.TransformerException; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class DynamicConfigTransformer implements PipelineConfigurationTransformer { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicConfigTransformer.class); + private final ObjectMapper objectMapper = new ObjectMapper(); + private final RuleEvaluator ruleEvaluator; + + /** + * Placeholder will look like "<>" + */ + private final Pattern PLACEHOLDER_PATTERN = Pattern.compile("\\<\\<\\s*(.+?)\\s*>>"); + /** + * Placeholder used to find "<>" + */ + private static final String PIPELINE_NAME_PLACEHOLDER_REGEX = "\\<\\<\\s*" + Pattern.quote("pipeline-name") + "\\s*\\>\\>"; + + /** + * This is the root node of the template json. This is got when converting template model to + * corresponding json and will always be a constant + */ + private static final String TEMPLATE_PIPELINE_ROOT_STRING = "templatePipelines"; + + /** + * placeholder for executing functions runtime based on template parameter + * Example: <> + */ + private static final String FUNCTION_CALL_PLACEHOLDER_REGEX = "FUNCTION_NAME:(.*?),PARAMETER:(.*)"; + private final Pattern FUNCTION_CALL_PLACEHOLDER_PATTERN = Pattern.compile(FUNCTION_CALL_PLACEHOLDER_REGEX); + + /** + * Json Path expression like "?(@.)" seem to always return arrayNode even if it is an ObjectNode. + * JSON_PATH_ARRAY_DISAMBIGUATOR_PATTERN is a way used to detect and disambiguate the path. + */ + private static final String JSON_PATH_ARRAY_DISAMBIGUATOR_PATTERN = "[?(@."; + private static final String RECURSIVE_JSON_PATH_PATH = "$.."; + private static final String JSON_PATH_IDENTIFIER = "$."; + private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER"; + + + Configuration parseConfigWithJsonNode = Configuration.builder() + .jsonProvider(new JacksonJsonNodeJsonProvider()) + .mappingProvider(new JacksonMappingProvider()) + .options(Option.SUPPRESS_EXCEPTIONS) + .build(); + + public DynamicConfigTransformer(RuleEvaluator ruleEvaluator) { + this.ruleEvaluator = ruleEvaluator; + } + + /** + * High Level Explanation: + * Step1: Evaluate if transformation is needed + * Step2: Create a Map(placeholdersMap) with key as placeholder and value as List of JsonPath + * in templateJson. It is populated by recursively by tracking the placeholder and along the way, + * store the paths. + * Step3: Create a Map(pipelineExactPathMap) with exact path for the placeholder value in the + * original pipelineJson. + * Step4: For every placeholder, replace the template in the corresponding template json path with + * node from the original pipelineJson. + * Step5: Convert result to PipelinesDataFlowModel. + * + * @param preTransformedPipelinesDataFlowModel - represents the pre-transformed pipeline data flow model + * @return PipelinesDataFlowModel - Transformed PipelinesDataFlowModel. + */ + @Override + public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preTransformedPipelinesDataFlowModel) { + RuleEvaluatorResult ruleEvaluatorResult = ruleEvaluator.isTransformationNeeded(preTransformedPipelinesDataFlowModel); + + if (!ruleEvaluatorResult.isEvaluatedResult() || + ruleEvaluatorResult.getPipelineName() == null) { + return preTransformedPipelinesDataFlowModel; + } + + //To differentiate between sub-pipelines that dont need transformation. + String pipelineNameThatNeedsTransformation = ruleEvaluatorResult.getPipelineName(); + PipelineTemplateModel templateModel = ruleEvaluatorResult.getPipelineTemplateModel(); + try { + + Map pipelines = preTransformedPipelinesDataFlowModel.getPipelines(); + Map pipelineMap = new HashMap<>(); + pipelineMap.put(pipelineNameThatNeedsTransformation, + pipelines.get(pipelineNameThatNeedsTransformation)); + String pipelineJson = objectMapper.writeValueAsString(pipelineMap); + + String templateJsonStringWithPipelinePlaceholder = objectMapper.writeValueAsString(templateModel); + + //Replace pipeline name placeholder with pipelineNameThatNeedsTransformation + String templateJsonString = replaceTemplatePipelineName(templateJsonStringWithPipelinePlaceholder, + pipelineNameThatNeedsTransformation); + + // Find all PLACEHOLDER_PATTERN in template json string + Map> placeholdersMap = findPlaceholdersWithPathsRecursively(templateJsonString); + JsonNode templateRootNode = objectMapper.readTree(templateJsonString); + + // get exact path in pipelineJson + Map pipelineExactPathMap = findExactPath(placeholdersMap, pipelineJson); + + //replace placeholder with actual value in the template context + placeholdersMap.forEach((placeholder, templateJsonPathList) -> { + for (String templateJsonPath : templateJsonPathList) { + String pipelineExactJsonPath = pipelineExactPathMap.get(placeholder); + + if(isJsonPath(pipelineExactJsonPath)) { + JsonNode pipelineNode = JsonPath.using(parseConfigWithJsonNode).parse(pipelineJson).read(pipelineExactJsonPath); + // Json Path expression like "?(@.)" seem to always return arrayNode even if it is an Object. + // example: $.pipeline.sink[?(@.opensearch)].opensearch.aws expression will always return array + if (pipelineExactJsonPath.contains(JSON_PATH_ARRAY_DISAMBIGUATOR_PATTERN) && + pipelineNode.isArray() && pipelineNode.size() == 1) { + pipelineNode = pipelineNode.get(0); + } + replaceNode(templateRootNode, templateJsonPath, pipelineNode); + }else{ //in case it was a function call + JsonNode pipelineNode = objectMapper.valueToTree(pipelineExactJsonPath); + replaceNode(templateRootNode, templateJsonPath, pipelineNode); + } + } + }); + + PipelinesDataFlowModel transformedPipelinesDataFlowModel = getTransformedPipelinesDataFlowModel(pipelineNameThatNeedsTransformation, preTransformedPipelinesDataFlowModel, templateRootNode); + return transformedPipelinesDataFlowModel; + } catch (JsonProcessingException | TransformerException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Convert templateRootNode which contains the transformedJson to PipelinesDataFlowModel + * + * @param pipelineNameThatNeedsTransformation + * @param preTransformedPipelinesDataFlowModel + * @param templateRootNode - transformedJson Node. + * @return PipelinesDataFlowModel - transformed model. + * @throws JsonProcessingException + */ + private PipelinesDataFlowModel getTransformedPipelinesDataFlowModel(String pipelineNameThatNeedsTransformation, PipelinesDataFlowModel preTransformedPipelinesDataFlowModel, JsonNode templateRootNode) throws JsonProcessingException { + //update template json + String transformedJson = objectMapper.writeValueAsString(templateRootNode); + LOG.debug("{} pipeline has been transformed to :{}", pipelineNameThatNeedsTransformation, transformedJson); + + //convert TransformedJson to PipelineModel with the data from preTransformedDataFlowModel. + //transform transformedJson to Map + Map transformedConfigMap = objectMapper.readValue(transformedJson, Map.class); + + Map pipelines = preTransformedPipelinesDataFlowModel.getPipelines(); + + // get the root of the Transformed Pipeline Model, to get the actual pipelines. + // direct conversion to PipelineDataModel throws exception. + Map transformedPipelines = (Map) transformedConfigMap.get(TEMPLATE_PIPELINE_ROOT_STRING); + pipelines.forEach((pipelineName, pipeline) -> { + if (!pipelineName.equals(pipelineNameThatNeedsTransformation)) { + transformedPipelines.put(pipelineName, pipeline); + } + }); + + // version is not required here as it is already handled in parseStreamToPipelineDataFlowModel + PipelinesDataFlowModel transformedPipelinesDataFlowModel = new PipelinesDataFlowModel( + preTransformedPipelinesDataFlowModel.getPipelineExtensions(), + transformedPipelines + ); + String transformedPipelinesDataFlowModelJson = objectMapper.writeValueAsString(transformedPipelinesDataFlowModel); + LOG.debug("Transformed PipelinesDataFlowModel: {}", transformedPipelinesDataFlowModelJson); + + return transformedPipelinesDataFlowModel; + } + + private String replaceTemplatePipelineName(String templateJsonStringWithPipelinePlaceholder, String pipelineName) { + return templateJsonStringWithPipelinePlaceholder.replaceAll(PIPELINE_NAME_PLACEHOLDER_REGEX, pipelineName); + } + + /** + * Recursively walks through the json to find the placeholder with a certain regEx pattern, + * along the way keeps track of the path. + * + * @param json + * @return Map> , K:placeholder, V: list of jsonPath in templateJson + * @throws IOException + */ + private Map> findPlaceholdersWithPathsRecursively(String json) throws IOException { + + JsonNode rootNode = objectMapper.readTree(json); + Map> placeholdersWithPaths = new HashMap<>(); + populateMapWithPlaceholderPaths(rootNode, "", placeholdersWithPaths); + return placeholdersWithPaths; + } + + private void populateMapWithPlaceholderPaths(JsonNode currentNode, String currentPath, Map> placeholdersWithPaths) { + if (currentNode.isObject()) { + Iterator> fields = currentNode.fields(); + while (fields.hasNext()) { + Map.Entry entry = fields.next(); + String path = currentPath.isEmpty() ? entry.getKey() : currentPath + "." + entry.getKey(); + populateMapWithPlaceholderPaths(entry.getValue(), path, placeholdersWithPaths); + } + } else if (currentNode.isArray()) { + for (int i = 0; i < currentNode.size(); i++) { + String path = currentPath + "[" + i + "]"; + populateMapWithPlaceholderPaths(currentNode.get(i), path, placeholdersWithPaths); + } + } else if (currentNode.isValueNode()) { + String placeHolderValue = currentNode.asText(); + Matcher matcher = PLACEHOLDER_PATTERN.matcher(placeHolderValue); + if (matcher.find()) { + if (!placeholdersWithPaths.containsKey(placeHolderValue)) { + List paths = new ArrayList<>(); + paths.add(currentPath); + placeholdersWithPaths.put(placeHolderValue, paths); + } else { + List existingPaths = placeholdersWithPaths.get(placeHolderValue); + existingPaths.add(currentPath); + placeholdersWithPaths.put(placeHolderValue, existingPaths); + } + } + } + } + + /** + * Gets exact path - this is to avoid + * getting array values(even though it might not be an array) given + * a recursive expression like "$..<>" + * + * @param placeholdersMap + * @return Map K:jsonPath, V:exactPath + * @throws IOException + */ + private Map findExactPath(Map> placeholdersMap, String pipelineJson) throws IOException, TransformerException { + Map mapWithPaths = new HashMap<>(); + for (String genericPathPlaceholder : placeholdersMap.keySet()) { + String placeHolderValue = getValueFromPlaceHolder(genericPathPlaceholder); + + String value = executeFunctionPlaceholder(placeHolderValue, pipelineJson); + + // Recursive pattern in json path is NOT allowed + if (value.contains(RECURSIVE_JSON_PATH_PATH)) { + throw new TransformerException(format("Json path {} is not supported", value)); + } + mapWithPaths.put(genericPathPlaceholder, value); + } + return mapWithPaths; + } + + /** + * Get value from the placeholder field. + * Removes the brackets surrounding the value. + * + * @param placeholder + * @return String - placeholder value. + */ + private String getValueFromPlaceHolder(String placeholder) { + // placeholder should be valid here as it is regEx matched in populateMapWithPlaceholderPaths + return placeholder.substring(2, placeholder.length() - 2); + } + + /** + * + * @param functionPlaceholderValue + * @return String - value of the function executed + */ + private String executeFunctionPlaceholder(String functionPlaceholderValue, String pipelineJson){ + Matcher functionMatcher = FUNCTION_CALL_PLACEHOLDER_PATTERN.matcher(functionPlaceholderValue); + if (functionMatcher.find()) { + String functionName = functionMatcher.group(1); + String parameter = functionMatcher.group(2); + try { + String parameterValue = (String)parseParameter(parameter, pipelineJson); + String value = (String) invokeMethod(functionName, String.class, parameterValue); + return value; + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } else { + return functionPlaceholderValue; + } + } + + private Object parseParameter(String parameter, String pipelineJson) { + if(isJsonPath(parameter)){ + JsonNode pipelineNode = JsonPath.using(parseConfigWithJsonNode).parse(pipelineJson).read(parameter); + if(!pipelineNode.isValueNode()){ + throw new RuntimeException("parameter has to be a value node"); + } + String nodeValue = pipelineNode.asText(); + return nodeValue; + } + return parameter; + } + + /** + * Check if the parameter passed is a json path or not. + * @param parameter + * @return boolean + */ + private boolean isJsonPath(String parameter) { + try { + if(parameter.contains(JSON_PATH_IDENTIFIER)){ + JsonPath.compile(parameter); + return true; + } + return false; + } catch (IllegalArgumentException | PathNotFoundException e) { + return false; + } + } + + /** + * Specific to DocDB depth field. + * @param s3Prefix + * @return + */ + public String calculateDepth(String s3Prefix) { + return Integer.toString(s3Prefix.split("/").length + 3); + } + + public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){ + String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + return s3Prefix+"/"+envSourceCoordinationIdentifier; + } + + /** + * Invokes a method dynamically on a given object. + * + * @param methodName the name of the method to be invoked + * @param parameterType the Class object representing the parameter type + * @param arg the parameter to be passed to the method + * @return the result of the method invocation + * @throws ReflectiveOperationException if the method cannot be invoked + */ + public Object invokeMethod(String methodName, Class parameterType, Object arg) throws ReflectiveOperationException { + // Get the Class object + Class clazz = this.getClass(); + + // Get the Method object for the specified method and parameter type + Method method = clazz.getMethod(methodName, parameterType); + + // Invoke the method on the object with the given argument + return method.invoke(this, arg); + } + + + /** + * Replaces template node in the jsonPath with the node from + * original json. + * + * @param root + * @param jsonPath + * @param newNode + */ + public void replaceNode(JsonNode root, String jsonPath, JsonNode newNode) { + try { + if (newNode == null) { +// throw new PathNotFoundException(format("jsonPath {} not found", jsonPath)); + LOG.debug("Did not find jsonPath {}",jsonPath); + } + // Read the parent path of the target node + String parentPath = jsonPath.substring(0, jsonPath.lastIndexOf('.')); + String fieldName = jsonPath.substring(jsonPath.lastIndexOf('.') + 1); + + // Find the parent node + JsonNode parentNode = JsonPath.using(parseConfigWithJsonNode).parse(root).read(parentPath); + + // Replace the target field in the parent node + if (parentNode != null && parentNode instanceof ObjectNode) { + ((ObjectNode) parentNode).replace(fieldName, newNode); + } else { + LOG.error("Path does not point to object node"); + throw new IllegalArgumentException("Path does not point to object node"); + } + } catch (PathNotFoundException e) { + LOG.error("JsonPath {} not found", jsonPath); + throw new PathNotFoundException(format("JsonPath {} not found", jsonPath)); + } + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineConfigurationTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineConfigurationTransformer.java new file mode 100644 index 0000000000..408775b538 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineConfigurationTransformer.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.transformer; + +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; + +public interface PipelineConfigurationTransformer { + + PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel pipelinesDataFlowModel); + +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineTemplateModel.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineTemplateModel.java new file mode 100644 index 0000000000..1c10dc1b69 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineTemplateModel.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.transformer; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.annotation.Nulls; +import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.configuration.PipelineExtensions; + +import java.util.HashMap; +import java.util.Map; + +public class PipelineTemplateModel { + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonSetter(nulls = Nulls.SKIP) + private DataPrepperVersion version; + + @JsonProperty("pipeline_configurations") + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonSetter(nulls = Nulls.SKIP) + private PipelineExtensions pipelineExtensions; + + @JsonAnySetter + private Map pipelines = new HashMap<>(); + + @JsonCreator + @SuppressWarnings("unused") + public PipelineTemplateModel() { + } + + public PipelineTemplateModel(final Map pipelines) { + this.pipelines = pipelines; + } + + public Map getTemplatePipelines() { + return pipelines; + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineTransformationPathProvider.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineTransformationPathProvider.java new file mode 100644 index 0000000000..a082e58f1f --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineTransformationPathProvider.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.transformer; + +public interface PipelineTransformationPathProvider { + + String getTransformationTemplateDirectoryLocation(); + + String getTransformationRulesDirectoryLocation(); + +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java new file mode 100644 index 0000000000..ec09e90fe7 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.transformer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import static org.opensearch.dataprepper.pipeline.parser.PipelineTransformationConfiguration.RULES_DIRECTORY_PATH; +import static org.opensearch.dataprepper.pipeline.parser.PipelineTransformationConfiguration.TEMPLATES_DIRECTORY_PATH; + +import javax.inject.Named; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + +public class TransformersFactory implements PipelineTransformationPathProvider { + + private static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory()); + + private final String TEMPLATE_FILE_NAME_PATTERN = "-template.yaml"; + private final String RULE_FILE_NAME_PATTERN = "-rule.yaml"; + private final String templatesDirectoryPath; + private final String rulesDirectoryPath; + String PLUGIN_NAME = null; + + public TransformersFactory(@Named(RULES_DIRECTORY_PATH) final String rulesDirectoryPath, + @Named(TEMPLATES_DIRECTORY_PATH) final String templatesDirectoryPath) { + this.templatesDirectoryPath = templatesDirectoryPath; + this.rulesDirectoryPath = rulesDirectoryPath; + } + + @Override + public String getTransformationTemplateDirectoryLocation() { + return templatesDirectoryPath; + } + + @Override + public String getTransformationRulesDirectoryLocation() { + return rulesDirectoryPath; + } + + public String getPluginTemplateFileLocation(String pluginName) { + if(pluginName == null || pluginName.isEmpty()){ + throw new RuntimeException("Transformation plugin not found"); + } + return templatesDirectoryPath + "/" + pluginName + TEMPLATE_FILE_NAME_PATTERN; + } + + public String getPluginRuleFileLocation(String pluginName) { + if(pluginName.isEmpty()){ + throw new RuntimeException("Transformation plugin not found"); + } + return rulesDirectoryPath + "/" + pluginName + RULE_FILE_NAME_PATTERN; + } + + public PipelineTemplateModel getTemplateModel(String pluginName) { + String templatePath = getPluginTemplateFileLocation(pluginName); + + try { + PipelineTemplateModel pipelineTemplateModel = YAML_MAPPER.readValue(new File(templatePath), PipelineTemplateModel.class); + return pipelineTemplateModel; + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/data-prepper-pipeline-parser/src/resources/rules/documentdb-rule.yaml b/data-prepper-pipeline-parser/src/resources/rules/documentdb-rule.yaml new file mode 100644 index 0000000000..28da1406f8 --- /dev/null +++ b/data-prepper-pipeline-parser/src/resources/rules/documentdb-rule.yaml @@ -0,0 +1,3 @@ +apply_when: + - "$..source.documentdb" + - "$..source.documentdb.collections[0].s3_bucket" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/resources/templates/documentdb-template.yaml b/data-prepper-pipeline-parser/src/resources/templates/documentdb-template.yaml new file mode 100644 index 0000000000..9cb3c67aa2 --- /dev/null +++ b/data-prepper-pipeline-parser/src/resources/templates/documentdb-template.yaml @@ -0,0 +1,83 @@ +"<>": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: + bounded_blocking: + batch_size: 125000 + buffer_size: 1000000 + source: + documentdb: "<<$.<>.source.documentdb>>" + routes: + - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' + - stream_load: 'getMetadata("ingestion_type") == "STREAM"' + sink: + - s3: + routes: + - initial_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.opensearch.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.opensearch.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.opensearch.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "256kb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + json: + - s3: + routes: + - stream_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.opensearch.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.opensearch.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.opensearch.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "30s" + maximum_size: "1mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + json: + + "<>.s3": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: + bounded_blocking: + batch_size: 125000 + buffer_size: 1000000 + source: + s3: + codec: + json: + compression: "none" + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.opensearch.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.opensearch.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.opensearch.aws.sts_header_overrides>>" + acknowledgments: true + delete_s3_objects_on_read: true + scan: + buckets: + - bucket: + name: "<<$.<>.source.documentdb.s3_bucket>>" + filter: + include_prefix: ["<>.source.documentdb.s3_prefix>>"] + depth: "<>.source.documentdb.s3_prefix>>" + scheduling: + interval: "1s" + processor: "<<$.<>.processor>>" + sink: "<<$.<>.sink>>" + routes: "<<$.<>.routes>>" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParserTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParserTest.java index 416730fc77..e7e9fbe310 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParserTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/PipelinesDataflowModelParserTest.java @@ -5,11 +5,17 @@ package org.opensearch.dataprepper.pipeline.parser; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; +import static org.mockito.Mockito.when; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; @@ -24,13 +30,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class PipelinesDataflowModelParserTest { @@ -187,4 +186,26 @@ void parseConfiguration_from_directory_with_single_file_creates_the_correct_mode assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), equalTo(TestConfigurationProvider.VALID_MULTIPLE_PIPELINE_NAMES)); } + + @Test + void parseConfiguration_check_successful_transformation() { + final File directoryLocation = new File(TestConfigurationProvider.SINGLE_FILE_PIPELINE_DIRECTOTRY); + final List fileInputStreams = Stream.of(directoryLocation.listFiles()) + .map(file -> { + try { + return new FileInputStream(file); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + + when(pipelineConfigurationReader.getPipelineConfigurationInputStreams()).thenReturn(fileInputStreams); + + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(pipelineConfigurationReader); + final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), + equalTo(TestConfigurationProvider.VALID_MULTIPLE_PIPELINE_NAMES)); + } } \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/TestConfigurationProvider.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/TestConfigurationProvider.java index 673a8dbc1e..cb23579bcc 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/TestConfigurationProvider.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/TestConfigurationProvider.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.pipeline.parser; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.configuration.SinkModel; @@ -17,9 +19,6 @@ import java.util.Map; import java.util.Set; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class TestConfigurationProvider { public static final String TEST_PIPELINE_NAME = "test-pipeline-1"; public static final String TEST_PLUGIN_NAME_1 = "test-plugin-1"; @@ -37,6 +36,32 @@ public class TestConfigurationProvider { public static final String SINGLE_FILE_PIPELINE_DIRECTOTRY = "src/test/resources/single-pipeline"; public static final String EMPTY_PIPELINE_DIRECTOTRY = "src/test/resources/no-pipelines"; public static final String INCOMPATIBLE_VERSION_CONFIG_FILE = "src/test/resources/incompatible_version.yml"; + public static final String TEMPLATES_SOURCE_TRANSFORMATION_DIRECTORY = "src/test/resources/transformation/templates/testSource"; + public static final String RULES_TRANSFORMATION_DIRECTORY = "src/test/resources/transformation/rules"; + + public static final String USER_CONFIG_TRANSFORMATION_DOCDB_SIMPLE_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb-simple-userconfig.yaml"; + public static final String USER_CONFIG_TRANSFORMATION_DOCDB1_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb1-userconfig.yaml"; + public static final String USER_CONFIG_TRANSFORMATION_DOCDB2_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb2-userconfig.yaml"; + public static final String USER_CONFIG_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb-userconfig.yaml"; + public static final String USER_CONFIG_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb-subpipelines-userconfig.yaml"; + public static final String USER_CONFIG_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE = "src/test/resources/transformation/userConfig/documentdb-function-userconfig.yaml"; + + public static final String RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE = "src/test/resources/transformation/rules/documentdb1-rule.yaml"; + public static final String RULES_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE = "src/test/resources/transformation/rules/documentdb-rule.yaml"; + + + public static final String TEMPLATE_TRANSFORMATION_DOCDB_SIMPLE_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb-simple-template.yaml"; + public static final String TEMPLATE_TRANSFORMATION_DOCDB1_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb1-template.yaml"; + public static final String TEMPLATE_TRANSFORMATION_DOCDB2_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb2-template.yaml"; + public static final String TEMPLATE_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb-template.yaml"; + public static final String TEMPLATE_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb-subpipelines-template.yaml"; + public static final String TEMPLATE_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE = "src/test/resources/transformation/templates/testSource/documentdb-function-template.yaml"; + + public static final String EXPECTED_TRANSFORMATION_DOCDB1_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb1-expected.yaml"; + public static final String EXPECTED_TRANSFORMATION_DOCDB2_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb2-expected.yaml"; + public static final String EXPECTED_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb-expected.yaml"; + public static final String EXPECTED_TRANSFORMATION_DOCUMENTDB_SUBPIPLINES_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb-subpipelines-expected.yaml"; + public static final String EXPECTED_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE = "src/test/resources/transformation/expected/documentdb-function-expected.yaml"; public static Set VALID_MULTIPLE_PIPELINE_NAMES = new HashSet<>(Arrays.asList("test-pipeline-1", diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java new file mode 100644 index 0000000000..11064546f5 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorResultTest.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel; + +class RuleEvaluatorResultTest { + + @Test + void builder_createsObjectCorrectly() { + PipelineTemplateModel pipelineTemplateModel = new PipelineTemplateModel(); + RuleEvaluatorResult result = RuleEvaluatorResult.builder() + .withEvaluatedResult(true) + .withPipelineName("testPipeline") + .withPipelineTemplateModel(pipelineTemplateModel) + .build(); + + assertTrue(result.isEvaluatedResult()); + assertEquals(result.getPipelineName(), "testPipeline"); + assertEquals(result.getPipelineTemplateModel(), pipelineTemplateModel); + } + + @Test + void defaultConstructor_initializesFieldsCorrectly() { + RuleEvaluatorResult result = new RuleEvaluatorResult(); + assertFalse(result.isEvaluatedResult()); + assertNull(result.getPipelineName()); + assertNull(result.getPipelineTemplateModel()); + } + + @Test + void allArgsConstructor_assignsFieldsCorrectly() { + PipelineTemplateModel pipelineTemplateModel = new PipelineTemplateModel(); + RuleEvaluatorResult result = new RuleEvaluatorResult(true, + "testPipeline", pipelineTemplateModel); + + assertTrue(result.isEvaluatedResult()); + assertEquals(result.getPipelineName(), "testPipeline"); + assertEquals(result.getPipelineTemplateModel(), pipelineTemplateModel); + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java new file mode 100644 index 0000000000..9b5bec5c57 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import org.opensearch.dataprepper.model.configuration.PipelineExtensions; +import org.opensearch.dataprepper.model.configuration.PipelineModel; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.SinkModel; +import org.opensearch.dataprepper.pipeline.parser.TestConfigurationProvider; +import org.opensearch.dataprepper.pipeline.parser.transformer.TransformersFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class RuleEvaluatorTest { + + @Test + void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() { + + // Set up + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String pluginName = "documentdb"; + String pipelineName = "test-pipeline"; + Map sourceOptions = new HashMap(); + Map s3_bucket = new HashMap<>(); + s3_bucket.put("s3_bucket", "bucket-name"); + List collections = new ArrayList(); + collections.add(s3_bucket); + sourceOptions.put("collections", collections); + final PluginModel source = new PluginModel(pluginName, sourceOptions); + final List processors = Collections.singletonList(new PluginModel("testProcessor", null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); + final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50); + + final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel( + (PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel)); + + TransformersFactory transformersFactory = Mockito.spy(new TransformersFactory( + TestConfigurationProvider.RULES_TRANSFORMATION_DIRECTORY, + TestConfigurationProvider.TEMPLATES_SOURCE_TRANSFORMATION_DIRECTORY + )); + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); + when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); + RuleEvaluatorResult result = ruleEvaluator.isTransformationNeeded(pipelinesDataFlowModel); + + // Assert + assertTrue(result.isEvaluatedResult()); + assertEquals(result.getPipelineName(), pipelineName); + } + + @Test + void test_isTransformationNeeded_ForOtherSource_ShouldReturn_False() { + // Set up + String pipelineName = "test-pipeline"; + Map sourceOptions = new HashMap(); + sourceOptions.put("option1", "1"); + sourceOptions.put("option2", null); + final PluginModel source = new PluginModel("http", sourceOptions); + final List processors = Collections.singletonList(new PluginModel("testProcessor", null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); + final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50); + + final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel( + (PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel)); + + TransformersFactory transformersFactory = Mockito.spy(new TransformersFactory( + TestConfigurationProvider.RULES_TRANSFORMATION_DIRECTORY, + TestConfigurationProvider.TEMPLATES_SOURCE_TRANSFORMATION_DIRECTORY + )); + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); + RuleEvaluatorResult result = ruleEvaluator.isTransformationNeeded(pipelinesDataFlowModel); + + // Assert + assertEquals(result.isEvaluatedResult(), false); + } + + @Test + void testThrowsExceptionOnFileError() { + TransformersFactory transformersFactory = mock(TransformersFactory.class); + String pipelineName = "test-pipeline"; + Map sourceOptions = new HashMap(); + sourceOptions.put("option1", "1"); + sourceOptions.put("option2", null); + final PluginModel source = new PluginModel("http", sourceOptions); + final List processors = Collections.singletonList(new PluginModel("testProcessor", null)); + final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); + final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50); + + final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel( + (PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel)); + + // Setup mock to throw an exception when file path is incorrect + when(transformersFactory.getPluginRuleFileLocation("documentdb")).thenThrow(new RuntimeException("File not found")); + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); + + // Execute and Assert + Exception exception = assertThrows(RuntimeException.class, () -> { + ruleEvaluator.isTransformationNeeded(pipelinesDataFlowModel); + }); + assertEquals("File not found", exception.getMessage()); + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java new file mode 100644 index 0000000000..2cd6f0fad4 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Arrays; +import java.util.List; + +class RuleTransformerModelTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + void testSerialization() throws Exception { + List applyWhen = Arrays.asList("condition1", "condition2"); + RuleTransformerModel model = new RuleTransformerModel(applyWhen); + + String json = objectMapper.writeValueAsString(model); + assertNotNull(json, "Serialized JSON should not be null"); + } + + @Test + void testDeserialization() throws Exception { + String json = "{\"apply_when\":[\"condition1\",\"condition2\"]}"; + + RuleTransformerModel model = objectMapper.readValue(json, RuleTransformerModel.class); + assertNotNull(model, "Deserialized model should not be null"); + assertEquals(2, model.getApplyWhen().size(), "ApplyWhen should contain two conditions"); + assertEquals("condition1", model.getApplyWhen().get(0), "The first condition should be 'condition1'"); + assertEquals("condition2", model.getApplyWhen().get(1), "The second condition should be 'condition2'"); + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java new file mode 100644 index 0000000000..68dae1b662 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java @@ -0,0 +1,209 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.transformer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; +import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader; +import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationReader; +import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser; +import org.opensearch.dataprepper.pipeline.parser.TestConfigurationProvider; +import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({System.class}) +class DynamicConfigTransformerTest { + + private final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory() + .disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER)); + private final String RULES_DIRECTORY_PATH = "src/test/resources/transformation/rules"; + private final String TEMPLATES_DIRECTORY_PATH = "src/test/resources/transformation/templates/testSource"; + + TransformersFactory transformersFactory; + RuleEvaluator ruleEvaluator; + + @Test + void test_successful_transformation_with_only_source_and_sink() throws IOException { + + String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCDB1_CONFIG_FILE; + String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCDB1_CONFIG_FILE; + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; + String expectedDocDBFilePath = TestConfigurationProvider.EXPECTED_TRANSFORMATION_DOCDB1_CONFIG_FILE; + String pluginName = "documentdb"; + PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(pipelineConfigurationReader); + + transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, + TEMPLATES_DIRECTORY_PATH)); + when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); + when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + ruleEvaluator = new RuleEvaluator(transformersFactory); + + // Load the original and template YAML files from the test resources directory + PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + PipelineConfigurationTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + PipelinesDataFlowModel transformedModel = transformer.transformConfiguration(pipelinesDataFlowModel); + String transformedYaml = yamlMapper.writeValueAsString(transformedModel); + + Map transformedYamlasMap = yamlMapper.readValue(transformedYaml, Map.class); + Map expectedYamlasMap = yamlMapper.readValue(new File(expectedDocDBFilePath), Map.class); + assertThat(expectedYamlasMap).usingRecursiveComparison().isEqualTo(transformedYamlasMap); + } + + @Test + void test_successful_transformation_with_documentdb() throws IOException { + + String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String expectedDocDBFilePath = TestConfigurationProvider.EXPECTED_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String pluginName = "documentdb"; + PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(pipelineConfigurationReader); + + transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, + TEMPLATES_DIRECTORY_PATH)); + when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); + when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + ruleEvaluator = new RuleEvaluator(transformersFactory); + + // Load the original and template YAML files from the test resources directory + PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + PipelineConfigurationTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + PipelinesDataFlowModel transformedModel = transformer.transformConfiguration(pipelinesDataFlowModel); + String transformedYaml = yamlMapper.writeValueAsString(transformedModel); + + Map transformedYamlasMap = yamlMapper.readValue(transformedYaml, Map.class); + Map expectedYamlasMap = yamlMapper.readValue(new File(expectedDocDBFilePath), Map.class); + assertThat(expectedYamlasMap).usingRecursiveComparison().isEqualTo(transformedYamlasMap); + } + + @Test + void test_successful_transformation_with_subpipelines() throws IOException { + + String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_CONFIG_FILE; + String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_CONFIG_FILE; + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String expectedDocDBFilePath = TestConfigurationProvider.EXPECTED_TRANSFORMATION_DOCUMENTDB_SUBPIPLINES_CONFIG_FILE; + String pluginName = "documentdb"; + PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(pipelineConfigurationReader); + + transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, + TEMPLATES_DIRECTORY_PATH)); + when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); + when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + ruleEvaluator = new RuleEvaluator(transformersFactory); + + // Load the original and template YAML files from the test resources directory + PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + PipelineConfigurationTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + PipelinesDataFlowModel transformedModel = transformer.transformConfiguration(pipelinesDataFlowModel); + String transformedYaml = yamlMapper.writeValueAsString(transformedModel); + + Map transformedYamlasMap = yamlMapper.readValue(transformedYaml, Map.class); + Map expectedYamlasMap = yamlMapper.readValue(new File(expectedDocDBFilePath), Map.class); + assertThat(expectedYamlasMap).usingRecursiveComparison().isEqualTo(transformedYamlasMap); + } + + @Test + void test_successful_transformation_with_functionPlaceholder() throws IOException { + + String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE; + String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE; + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; + String expectedDocDBFilePath = TestConfigurationProvider.EXPECTED_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE; + String pluginName = "documentdb"; + PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(pipelineConfigurationReader); + + transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, + TEMPLATES_DIRECTORY_PATH)); + when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); + when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + ruleEvaluator = new RuleEvaluator(transformersFactory); + + // Load the original and template YAML files from the test resources directory + PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + PipelineConfigurationTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + PipelinesDataFlowModel transformedModel = transformer.transformConfiguration(pipelinesDataFlowModel); + String transformedYaml = yamlMapper.writeValueAsString(transformedModel); + + Map transformedYamlasMap = yamlMapper.readValue(transformedYaml, Map.class); + Map expectedYamlasMap = yamlMapper.readValue(new File(expectedDocDBFilePath), Map.class); + assertThat(expectedYamlasMap).usingRecursiveComparison().isEqualTo(transformedYamlasMap); + } + + + @Test + void test_successful_transformation_with_complete_template() throws IOException { + + String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCDB2_CONFIG_FILE; + String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCDB2_CONFIG_FILE; + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; + String expectedDocDBFilePath = TestConfigurationProvider.EXPECTED_TRANSFORMATION_DOCDB2_CONFIG_FILE; + String pluginName = "documentdb"; + PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(pipelineConfigurationReader); + + transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, + TEMPLATES_DIRECTORY_PATH)); + when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); + when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + ruleEvaluator = new RuleEvaluator(transformersFactory); + + // Load the original and template YAML files from the test resources directory + PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + PipelineConfigurationTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + PipelinesDataFlowModel transformedModel = transformer.transformConfiguration(pipelinesDataFlowModel); + String transformedYaml = yamlMapper.writeValueAsString(transformedModel); + + Map transformedYamlasMap = yamlMapper.readValue(transformedYaml, Map.class); + + Map expectedYamlasMap = yamlMapper.readValue(new File(expectedDocDBFilePath), Map.class); + assertThat(expectedYamlasMap).usingRecursiveComparison().isEqualTo(transformedYamlasMap); + } + + @Test + void testInvalidJsonPathThrowsException() throws IOException { + String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCDB_SIMPLE_CONFIG_FILE; + String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCDB_SIMPLE_CONFIG_FILE; + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; + String pluginName = "documentdb"; + PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); + final PipelinesDataflowModelParser pipelinesDataflowModelParser = + new PipelinesDataflowModelParser(pipelineConfigurationReader); + + transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, + TEMPLATES_DIRECTORY_PATH)); + when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); + when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + ruleEvaluator = new RuleEvaluator(transformersFactory); + + // Load the original and template YAML files from the test resources directory + PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); + PipelineConfigurationTransformer transformer = new DynamicConfigTransformer(ruleEvaluator); + assertThrows(RuntimeException.class, () -> transformer.transformConfiguration(pipelinesDataFlowModel)); + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineTemplateModelTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineTemplateModelTest.java new file mode 100644 index 0000000000..3a0da61231 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/PipelineTemplateModelTest.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.transformer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class PipelineTemplateModelTest { + + private PipelineTemplateModel pipelineTemplateModel; + private Map pipelines; + + @BeforeEach + public void setup() { + pipelines = new HashMap<>(); + pipelines.put("examplePipeline", new Object()); + pipelineTemplateModel = new PipelineTemplateModel(pipelines); + } + + @Test + public void testConstructor_initializesMap() { + assertNotNull(pipelineTemplateModel.getTemplatePipelines()); + assertEquals(1, pipelineTemplateModel.getTemplatePipelines().size()); + assertTrue(pipelineTemplateModel.getTemplatePipelines().containsKey("examplePipeline")); + } + + @Test + public void testGetTemplatePipelines_returnsCorrectMap() { + Map retrievedMap = pipelineTemplateModel.getTemplatePipelines(); + assertEquals(pipelines, retrievedMap); + } + + @Test + public void testJsonAnySetter_addsDynamicProperties() throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + String json = "{\"extraProperty\":\"value\", \"anotherProperty\":123}"; + + PipelineTemplateModel modelFromJson = objectMapper.readValue(json, PipelineTemplateModel.class); + assertNotNull(modelFromJson.getTemplatePipelines()); + assertEquals("value", modelFromJson.getTemplatePipelines().get("extraProperty")); + assertEquals(123, modelFromJson.getTemplatePipelines().get("anotherProperty")); + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java new file mode 100644 index 0000000000..4b3b8df0e4 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.pipeline.parser.transformer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; + +public class TransformersFactoryTest { + + private TransformersFactory transformersFactory; + private final String templatesDirectoryPath = "src/test/resources/templates"; + private final String rulesDirectoryPath = "src/test/resources/rules"; + private final String validPluginName = "testPlugin"; + private final String invalidPluginName = ""; + + @BeforeEach + public void setUp() { + transformersFactory = new TransformersFactory(rulesDirectoryPath, templatesDirectoryPath); + } + + @Test + public void testGetPluginTemplateFileLocation_validPluginName() { + String expectedPath = templatesDirectoryPath + "/" + validPluginName + "-template.yaml"; + assertEquals(expectedPath, transformersFactory.getPluginTemplateFileLocation(validPluginName)); + } + + @Test + public void testGetPluginTemplateFileLocation_invalidPluginName() { + Exception exception = assertThrows(RuntimeException.class, () -> { + transformersFactory.getPluginTemplateFileLocation(invalidPluginName); + }); + assertEquals("Transformation plugin not found", exception.getMessage()); + } + + @Test + public void testGetPluginRuleFileLocation_validPluginName() { + String expectedPath = rulesDirectoryPath + "/" + validPluginName + "-rule.yaml"; + assertEquals(expectedPath, transformersFactory.getPluginRuleFileLocation(validPluginName)); + } + + @Test + public void testGetPluginRuleFileLocation_invalidPluginName() { + Exception exception = assertThrows(RuntimeException.class, () -> { + transformersFactory.getPluginRuleFileLocation(invalidPluginName); + }); + assertEquals("Transformation plugin not found", exception.getMessage()); + } + + @Test + public void testGetTemplateModel_throwsRuntimeExceptionOnIOException() throws IOException { + ObjectMapper mockedYamlMapper = Mockito.mock(ObjectMapper.class); + String templatePath = templatesDirectoryPath + "/" + validPluginName + "-template.yaml"; + File expectedFile = new File(templatePath); + + Mockito.when(mockedYamlMapper.readValue(Mockito.eq(expectedFile), Mockito.eq(PipelineTemplateModel.class))) + .thenThrow(new IOException("Test exception")); + + assertThrows(RuntimeException.class, () -> transformersFactory.getTemplateModel(validPluginName)); + } + + @Test + public void testGetTemplateModel_invalidPluginNameThrowsRuntimeException() { + assertThrows(RuntimeException.class, () -> transformersFactory.getTemplateModel(invalidPluginName), + "Should throw a RuntimeException for empty plugin name."); + } +} + diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-expected.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-expected.yaml new file mode 100644 index 0000000000..98c08da43c --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-expected.yaml @@ -0,0 +1,106 @@ +extension: + aws: + secrets: + docdb-secret: + secret_id: "" + region: "" + sts_role_arn: "arn:aws:iam:::role/" +dodb-pipeline-transformed: + workers: 2 + delay: 0 + source: + documentdb: + hostname: "" + collections: + - collection: "." + export_config: + items_per_partition: 4000 + ingestion_mode: "export_stream" + s3_bucket: "my-bucket" + s3_region: "us-east-1" + acknowledgments: false + buffer: + bounded_blocking: + batch_size: 125000 + buffer_size: 1000000 + route: + - initial_load: "getMetadata(\"ingestion_type\") == \"EXPORT\"" + - stream_load: "getMetadata(\"ingestion_type\") == \"STREAM\"" + sink: + - s3: + routes: + - "initial_load" + aws: + sts_role_arn: "arn:aws:iam:::role/" + region: "" + serverless: true + bucket: "my-bucket" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "256mb" + flush_capacity_ratio: "0" + object_key: + path_prefix: "local-test/${getMetadata(\"s3_partition_key\")}" + codec: + json: null + - s3: + routes: + - "stream_load" + aws: + sts_role_arn: "arn:aws:iam:::role/" + region: "" + serverless: true + bucket: "my-bucket" + threshold: + event_collect_timeout: "30s" + maximum_size: "1mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "local-test/${getMetadata(\"s3_partition_key\")}" + codec: + json: null +dodb-pipeline-s3-sub-pipeline-transformed: + workers: 2 + delay: 0 + source: + s3: + delete_s3_objects_on_read: true + codec: + json: null + compression: "none" + aws: + sts_role_arn: "arn:aws:iam:::role/" + region: "" + serverless: true + acknowledgments: false + scan: + buckets: + - bucket: + name: "my-bucket" + filter: + include_prefix: + - "local-test" + scheduling: + interval: "1s" + buffer: + bounded_blocking: + batch_size: 125000 + buffer_size: 1000000 + sink: + - opensearch: + exclude_keys: + - "_id" + hosts: + - "" + action: "${getMetadata(\"opensearch_action\")}" + index: "" + aws: + sts_role_arn: "arn:aws:iam:::role/" + region: "" + serverless: true + document_id: "${getMetadata(\"primary_key\")}" + index_type: "custom" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-function-expected.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-function-expected.yaml new file mode 100644 index 0000000000..3506d43f77 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-function-expected.yaml @@ -0,0 +1,19 @@ +extension: + aws: + secrets: + docdb-secret: + secret_id: "" + region: "" + sts_role_arn: "arn:aws:iam:::role/" +docdb-pipeline-transformed: + source: + documentdb: + hostname: "host" + collections: + - collection: "database.collectionName" + acknowledgments: false + s3_prefix: "folder1/folder2" + sink: + - opensearch: + hosts: "host" + depth: "5" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-subpipelines-expected.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-subpipelines-expected.yaml new file mode 100644 index 0000000000..a260d85a24 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb-subpipelines-expected.yaml @@ -0,0 +1,39 @@ +extension: + aws: + secrets: + docdb-secret: + secret_id: "arn:aws:secretsmanager:us-west-2:420497401461:secret:prod/docdb-ip1Mqf" + region: "us-west-2" + sts_role_arn: "arn:aws:iam::420497401461:role/Admin" + +pipeline1-transformed: + source: + documentdb: + host: "host1" + collections: + - collection: "docdbdemo.streamdemo" + s3_bucket: "osiddbtest1" + s3_region: "us-west-2" + processor: + - string_converter: + upper_case: true + sink: + - s3: + aws: + sts_role_arn: "arn:aws:iam::420497401461:role/Admin" + region: "us-west-2" + bucket: "osiddbtest1" + +pipeline2: + source: + someSource: + host: "host2" + processor: + - string_mutate: + lower_case: true + sink: + - s3: + aws: + sts_role_arn: "arn:aws:iam::420497401461:role/Admin" + region: "us-west-2" + bucket: "someSourceBucket" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb1-expected.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb1-expected.yaml new file mode 100644 index 0000000000..c97e45809a --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb1-expected.yaml @@ -0,0 +1,16 @@ +simple-pipeline-transformed: + source: + documentdb: + hostname: "database.example.com" + port: "27017" + sink: + - opensearch: + hosts: "database.example.com" + port: "27017" + index: "my_index" + aws: + sts_role_arn: "arn123" + region: "us-test-1" + dlq: + s3: + bucket: "test-bucket" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml new file mode 100644 index 0000000000..3895ee7803 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml @@ -0,0 +1,107 @@ +extension: + aws: + secrets: + secret: + secret_id: "my-docdb-secret" + region: "us-east-1" + refresh_interval: "PT1H" +documentdb-pipeline-reader: + source: + documentdb: + s3_region: "bucket-region" + collections: + - collection: "dbname.collection1" + export: "true" + stream: "true" + partition_count: 256 + export_batch_size: 10000 + stream_batch_size: 1000 + hosts: + - "docdb-2024-01-03-20-31-17.cluster-abcdef.us-east-1.docdb.amazonaws.com" + acknowledgments: true + aws: + sts_role_arn: "docdb-role" + s3_bucket: "bucket-name1" + authentication: + username: "${{aws_secrets:secret:password}}" + password: "${{aws_secrets:secret:password}}" + s3_prefix: "folder1/folder2" + routes: + - initial_load: "getMetadata(\"ingestion_type\") == \"EXPORT\"" + - stream_load: "getMetadata(\"ingestion_type\") == \"STREAM\"" + sink: + - s3: null + routes: + - "initial_load" + aws: + region: "bucket-region" + sts_role_arn: "docdb-role" + sts_external_id: null + sts_header_overrides: null + bucket: "bucket-name1" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "256kb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + json: null + - s3: + routes: + - "stream_load" + aws: + region: "bucket-region" + sts_role_arn: "docdb-role" + bucket: "bucket-name1" + threshold: + event_collect_timeout: "30s" + maximum_size: "1mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + json: null + documentdb-pipeline-writer: + source: + s3: + codec: + json: null + compression: "none" + aws: + region: "bucket-region" + sts_role_arn: "docdb-role" + sts_external_id: null + sts_header_overrides: null + acknowledgments: true + delete_s3_objects_on_read: true + scan: + buckets: + - bucket: + name: "bucket-name1" + filter: + depth: "5" + scheduling: + interval: "1s" + processor: + - string_converter: + upper_case: true + sink: + - opensearch: + exclude_keys: + - "_id" + document_version_type: "external" + hosts: + - "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" + action: "${getMetadata(\"opensearch_action\")}" + index: "index_name" + aws: + sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role" + region: "us-east-1" + document_version: "${getMetadata(\"document_version\")}" + document_id: "${getMetadata(\"primary_key\")}" + index_type: "custom" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml new file mode 100644 index 0000000000..28da1406f8 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml @@ -0,0 +1,3 @@ +apply_when: + - "$..source.documentdb" + - "$..source.documentdb.collections[0].s3_bucket" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml new file mode 100644 index 0000000000..cb10684065 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml @@ -0,0 +1,2 @@ +apply_when: + - "$..source.documentdb" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-function-template.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-function-template.yaml new file mode 100644 index 0000000000..bb3fc7dd7a --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-function-template.yaml @@ -0,0 +1,6 @@ +"<>-transformed": + source: "<<$.<>.source>>" + sink: + - opensearch: + hosts: "<<$.<>.source.documentdb.hostname>>" + depth: "<>.source.documentdb.s3_prefix>>" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-simple-template.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-simple-template.yaml new file mode 100644 index 0000000000..b69c6712d2 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-simple-template.yaml @@ -0,0 +1,5 @@ +pipeline: + source: "<<$..source>>" + sink: + - s3: + bucket_name: "<<$.<>.source.hostname>>" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-subpipelines-template.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-subpipelines-template.yaml new file mode 100644 index 0000000000..98dae7e97a --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-subpipelines-template.yaml @@ -0,0 +1,7 @@ +"<>-transformed": + source: "<<$.<>.source>>" + processor: "<<$.<>.processor>>" + sink: + - s3: + aws: "<<$.<>.sink[?(@.opensearch)].opensearch.aws>>" + bucket: "<<$.<>.source.documentdb.collections[0].s3_bucket>>" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-template.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-template.yaml new file mode 100644 index 0000000000..959fcd1fc2 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb-template.yaml @@ -0,0 +1,68 @@ +"<>-transformed": + workers: 2 + delay: 0 + source: "<<$.<>.source>>" + buffer: + bounded_blocking: + batch_size: 125000 + buffer_size: 1000000 + route: + - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' + - stream_load: 'getMetadata("ingestion_type") == "STREAM"' + sink: + - s3: + routes: + - initial_load + aws: "<<$.<>.sink[?(@.opensearch)].opensearch.aws>>" + bucket: "<<$.<>.source.documentdb.collections[0].s3_bucket>>" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "256mb" + flush_capacity_ratio: "0" + object_key: + path_prefix: "local-test/${getMetadata(\"s3_partition_key\")}" + codec: + json: + - s3: + routes: + - stream_load + aws: "<<$.<>.sink[?(@.opensearch)].opensearch.aws>>" + bucket: "<<$.<>.source.documentdb.collections[0].s3_bucket>>" + threshold: + event_collect_timeout: "30s" + maximum_size: "1mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "local-test/${getMetadata(\"s3_partition_key\")}" + codec: + json: + +"<>-s3-sub-pipeline-transformed": + workers: 2 + delay: 0 + source: + s3: + delete_s3_objects_on_read: true + codec: + json: + compression: "none" + aws: "<<$.<>.sink[?(@.opensearch)].opensearch.aws>>" + acknowledgments: false + scan: + buckets: + - bucket: + name: "<<$.<>.source.documentdb.collections[0].s3_bucket>>" + filter: + include_prefix: ["local-test"] + scheduling: + interval: "1s" + buffer: + bounded_blocking: + batch_size: 125000 + buffer_size: 1000000 + sink: + - opensearch: "<<$.<>.sink[?(@.opensearch)].opensearch>>" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb1-template.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb1-template.yaml new file mode 100644 index 0000000000..b0f1cfa2c1 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb1-template.yaml @@ -0,0 +1,13 @@ +"<>-transformed": + source: "<<$.<>.source>>" + sink: + - opensearch: + hosts: "<<$.<>.source.documentdb.hostname>>" + port: "<<$.<>.source.documentdb.port>>" + index: "<<$.<>.sink[0].opensearch.index>>" + aws: + sts_role_arn: "arn123" + region: "us-test-1" + dlq: + s3: + bucket: "test-bucket" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb2-template.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb2-template.yaml new file mode 100644 index 0000000000..6bc03ea1ad --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb2-template.yaml @@ -0,0 +1,67 @@ +"<>-reader": + source: + documentdb: "<<$.<>.source.documentdb>>" + routes: + - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' + - stream_load: 'getMetadata("ingestion_type") == "STREAM"' + sink: + - s3: + routes: + - initial_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "256kb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + json: + - s3: + routes: + - stream_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "30s" + maximum_size: "1mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + json: + + "<>-writer": + source: + s3: + codec: + json: + compression: "none" + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" + acknowledgments: true + delete_s3_objects_on_read: true + scan: + buckets: + - bucket: + name: "<<$.<>.source.documentdb.s3_bucket>>" + filter: + depth: "<>.source.documentdb.s3_prefix>>" + scheduling: + interval: "1s" + processor: "<<$.<>.processor>>" + sink: "<<$.<>.sink>>" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-function-userconfig.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-function-userconfig.yaml new file mode 100644 index 0000000000..1d1081c382 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-function-userconfig.yaml @@ -0,0 +1,20 @@ +version: 2 +pipeline_configurations: + aws: + secrets: + docdb-secret: + secret_id: "" + region: "" + sts_role_arn: "arn:aws:iam:::role/" +docdb-pipeline: + source: + documentdb: + hostname: "host" + acknowledgments: false + s3_prefix: "folder1/folder2" + collections: + - collection: "database.collectionName" + sink: + - opensearch: + hosts: [""] + index: "" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-simple-userconfig.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-simple-userconfig.yaml new file mode 100644 index 0000000000..6c64ad095c --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-simple-userconfig.yaml @@ -0,0 +1,9 @@ +dodb-pipeline: + source: + documentdb: + hostname: "database.example.com" + sink: + - s3: + bucket_name: "test_bucket" + aws: + region: "us-test-1" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-subpipelines-userconfig.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-subpipelines-userconfig.yaml new file mode 100644 index 0000000000..9feeaeca6f --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-subpipelines-userconfig.yaml @@ -0,0 +1,53 @@ +version: "2" +pipeline_configurations: + aws: + secrets: + docdb-secret: + secret_id: "arn:aws:secretsmanager:us-west-2:420497401461:secret:prod/docdb-ip1Mqf" + region: "us-west-2" + sts_role_arn: "arn:aws:iam::420497401461:role/Admin" +pipeline1: + source: + documentdb: + host: "host1" + collections: + - collection: "docdbdemo.streamdemo" + s3_bucket: "osiddbtest1" + s3_region: "us-west-2" + processor: + - string_converter: + upper_case: true + sink: + - s3: + aws: + sts_role_arn: "arn:aws:iam::420497401461:role/Admin" + region: "us-west-2" + bucket: "osiddbtest2" + acknowledgments: false + scan: + buckets: + - bucket: + name: "osiddbtest3" + filter: + include_prefix: ["local-test-3"] + - opensearch: + aws: + sts_role_arn: "arn:aws:iam::420497401461:role/Admin" + region: "us-west-2" + bucket: "osiddbtest3" + acknowledgments: false + hosts: ["host1"] + +pipeline2: + source: + someSource: + host: "host2" + processor: + - string_mutate: + lower_case: true + sink: + - s3: + aws: + sts_role_arn: "arn:aws:iam::420497401461:role/Admin" + region: "us-west-2" + bucket: "someSourceBucket" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-userconfig.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-userconfig.yaml new file mode 100644 index 0000000000..4aa20782d7 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb-userconfig.yaml @@ -0,0 +1,40 @@ +version: 2 +pipeline_configurations: + aws: + secrets: + docdb-secret: + secret_id: "" + region: "" + sts_role_arn: "arn:aws:iam:::role/" +dodb-pipeline: + source: + documentdb: + hostname: "" + acknowledgments: false + collections: + - collection: "." + export_config: + items_per_partition: 4000 + ingestion_mode: export_stream + s3_bucket: "my-bucket" + # Specify the region of the S3 bucket + s3_region: "us-east-1" + sink: + - opensearch: + # REQUIRED: Provide an AWS OpenSearch endpoint + hosts: [ "" ] + # Link to documentation and recommendations for naming index and document_id + index: "" + index_type: custom + document_id: "${getMetadata(\"primary_key\")}" + action: "${getMetadata(\"opensearch_action\")}" + exclude_keys: [ "_id"] + aws: + # REQUIRED: Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com + sts_role_arn: "arn:aws:iam:::role/" + # Provide the region of the domain. + region: "" + # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection + serverless: true + #serverless_options: + #network_policy_name: "" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb1-userconfig.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb1-userconfig.yaml new file mode 100644 index 0000000000..f5d0a3c57e --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb1-userconfig.yaml @@ -0,0 +1,9 @@ +simple-pipeline: + source: + documentdb: + hostname: "database.example.com" + port: "27017" + sink: + - opensearch: + hosts: ["https://search-service.example.com"] + index: "my_index" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb2-userconfig.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb2-userconfig.yaml new file mode 100644 index 0000000000..0b43b22c5c --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/userConfig/documentdb2-userconfig.yaml @@ -0,0 +1,45 @@ +documentdb-pipeline: + source: + documentdb: + acknowledgments: true + hosts: ["docdb-2024-01-03-20-31-17.cluster-abcdef.us-east-1.docdb.amazonaws.com"] + authentication: + username: ${{aws_secrets:secret:password}} + password: ${{aws_secrets:secret:password}} + aws: + sts_role_arn: "docdb-role" + s3_bucket: "bucket-name1" + s3_region: "bucket-region" + s3_prefix: "folder1/folder2" # pipeline_name + collection + timestamp + collections: + - collection: "dbname.collection1" + export: "true" # full_load + stream: "true" # cdc + # OSI only configuration and not exposed to customer + partition_count: 256 + export_batch_size: 10000 + stream_batch_size: 1000 + processor: + - string_converter: + upper_case: true + sink: + - opensearch: + hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] + index: "index_name" + index_type: custom + exclude_keys: [ "_id"] + document_id: "${getMetadata(\"primary_key\")}" + action: "${getMetadata(\"opensearch_action\")}" + document_version: "${getMetadata(\"document_version\")}" + document_version_type: "external" + aws: + sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role" + region: "us-east-1" + +extension: + aws: + secrets: + secret: + secret_id: "my-docdb-secret" + region: "us-east-1" + refresh_interval: PT1H \ No newline at end of file diff --git a/docs/pipeline_configuration_transformation.md b/docs/pipeline_configuration_transformation.md new file mode 100644 index 0000000000..2b35804463 --- /dev/null +++ b/docs/pipeline_configuration_transformation.md @@ -0,0 +1,75 @@ +## Pipeline Configuration Transformation +Supports transformation of pipeline configuration from user provided configuration to +a transformed configuration based on template and rules. + +## Usage + +User provided configuration passes through rules, if the rules are valid, +the template for the transformations are dynamically chosen and applied. + +**User config** +```aidl +simple-pipeline: + source: + someSource: + hostname: "database.example.com" + port: "27017" + sink: + - opensearch: + hosts: ["https://search-service.example.com"] + index: "my_index" + +``` + +**Template** +```aidl +"<>-transformed": + source: "<<$.*.someSource>>" + sink: + - opensearch: + hosts: "<<$.*.sink[?(@.opensearch)].opensearch.hosts>>" + port: "<<$.*.someSource.documentdb.port>>" + index: "<<$.*.sink[0].opensearch.index>>" + aws: + sts_role_arn: "arn123" + region: "us-test-1" + dlq: + s3: + bucket: "test-bucket" +``` + +**Rule** +``` +apply_when: + - "$..source.someSource" + ``` + +**Expected Transformed Config** +```aidl +simple-pipeline-transformed: + source: + someSource: + hostname: "database.example.com" + port: "27017" + sink: + - opensearch: + hosts: ["https://search-service.example.com"] + port: "27017" + index: "my_index" + aws: + sts_role_arn: "arn123" + region: "us-test-1" + dlq: + s3: + bucket: "test-bucket" +``` + +### Assumptions +1. In the template definition, Deep scan or recursive expressions like`$..` is NOT supported. Always use a more specific expression. +In the event specific variables in a path are not known, use wildcards. +2. User could provide multiple pipelines in their user config but +there can be only one pipeline that can support transformation. +3. There cannot be multiple transformations in a single pipeline. +4. `<<$ .. >>` is the placeholder in the template. +`<< pipeline-name >>` is handled differently as compared to other placeholders +as other placeholders are jsonPaths. \ No newline at end of file