From 38f807924d508c7b3a6bad681e27007bdb3774ee Mon Sep 17 00:00:00 2001 From: Srikanth Govindarajan Date: Fri, 16 Aug 2024 18:10:52 -0700 Subject: [PATCH] Add support for dynamic rule detection for pipeline config transformation (#4601) * Add support for dynamic rule detection for pipeline config transformation Signed-off-by: Srikanth Govindarajan * Address comments Signed-off-by: Srikanth Govindarajan * Move rules and templates to plugin level Signed-off-by: Srikanth Govindarajan * Add dummy plugin for testing dynamic rule detection Signed-off-by: Srikanth Govindarajan * Address comments Signed-off-by: Srikanth Govindarajan --------- Signed-off-by: Srikanth Govindarajan Signed-off-by: Srikanth Govindarajan --- .../PipelineTransformationConfiguration.java | 31 +--- .../pipeline/parser/rule/RuleEvaluator.java | 124 +++++++------- .../parser/rule/RuleFileEvaluation.java | 18 +++ .../pipeline/parser/rule/RuleStream.java | 26 +++ .../parser/rule/RuleTransformerModel.java | 21 +-- .../pipeline/parser/rule/TemplateStream.java | 20 +++ .../transformer/TransformersFactory.java | 148 ++++++++++------- .../templates/documentdb-template.yaml | 81 ---------- .../parser/rule/RuleEvaluatorTest.java | 69 ++++---- .../parser/rule/RuleTransformerModelTest.java | 6 +- .../DynamicConfigTransformerTest.java | 152 ++++++++++-------- .../transformer/TransformersFactoryTest.java | 96 ++++++----- .../transforms/rules/test-plugin-rule.yaml | 3 + .../templates/test-plugin-template.yaml | 4 + .../transformation/rules/documentdb-rule.yaml | 3 +- .../rules/documentdb1-rule.yaml | 1 + data-prepper-plugins/dummy-plugin/README.md | 63 ++++++++ .../dummy-plugin/build.gradle | 0 .../transforms/rules/dummy-plugin-rule.yaml | 3 + .../templates/dummy-plugin-template.yaml | 9 ++ .../transforms}/rules/documentdb-rule.yaml | 1 + .../templates/documentdb-template.yaml | 81 ++++++++++ settings.gradle | 3 +- 23 files changed, 574 insertions(+), 389 deletions(-) create mode 100644 data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java create mode 100644 data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleStream.java create mode 100644 data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/TemplateStream.java delete mode 100644 data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml create mode 100644 data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/rules/test-plugin-rule.yaml create mode 100644 data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/templates/test-plugin-template.yaml create mode 100644 data-prepper-plugins/dummy-plugin/README.md create mode 100644 data-prepper-plugins/dummy-plugin/build.gradle create mode 100644 data-prepper-plugins/dummy-plugin/src/main/resources/org/opensearch/dataprepper/transforms/rules/dummy-plugin-rule.yaml create mode 100644 data-prepper-plugins/dummy-plugin/src/main/resources/org/opensearch/dataprepper/transforms/templates/dummy-plugin-template.yaml rename {data-prepper-pipeline-parser/src/main/resources => data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms}/rules/documentdb-rule.yaml (74%) create mode 100644 data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/documentdb-template.yaml diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java index 9cf83b11d7..aa7d593bfb 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java @@ -9,39 +9,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.inject.Named; -import java.nio.file.Path; -import java.nio.file.Paths; - @Configuration public class PipelineTransformationConfiguration { - public static final String TEMPLATES_DIRECTORY_PATH = "TEMPLATES_DIRECTORY_PATH"; - public static final String RULES_DIRECTORY_PATH = "RULES_DIRECTORY_PATH"; - private static final Path currentDir = Paths.get(System.getProperty("user.dir")); -// private static final String parserRelativePath = "/data-prepper-pipeline-parser/src"; - - @Bean - @Named(RULES_DIRECTORY_PATH) - static String provideRulesDirectoryPath() { - ClassLoader classLoader = PipelineTransformationConfiguration.class.getClassLoader(); - String filePath = classLoader.getResource("rules").getFile(); - return filePath; - } - - @Bean - @Named(TEMPLATES_DIRECTORY_PATH) - static String provideTemplateDirectoryPath() { - ClassLoader classLoader = PipelineTransformationConfiguration.class.getClassLoader(); - String filePath = classLoader.getResource("templates").getFile(); - return filePath; - } @Bean - TransformersFactory transformersFactory( - @Named(RULES_DIRECTORY_PATH) String rulesDirectoryPath, - @Named(TEMPLATES_DIRECTORY_PATH) String templatesDirectoryPath - ) { - return new TransformersFactory(rulesDirectoryPath, templatesDirectoryPath); + TransformersFactory transformersFactory() { + return new TransformersFactory(); } @Bean diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java index e546d5fd10..5e2c50c4ff 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java @@ -5,15 +5,14 @@ package org.opensearch.dataprepper.pipeline.parser.rule; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; -import com.jayway.jsonpath.ParseContext; import com.jayway.jsonpath.PathNotFoundException; -import com.jayway.jsonpath.ReadContext; -import com.jayway.jsonpath.spi.json.JacksonJsonProvider; +import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider; import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; @@ -25,6 +24,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -34,40 +34,29 @@ public class RuleEvaluator { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); private final TransformersFactory transformersFactory; - private String PLUGIN_NAME = null; + public RuleEvaluator(TransformersFactory transformersFactory) { this.transformersFactory = transformersFactory; } public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelineModel) { - //TODO - Dynamically scan the rules folder and get the corresponding template. - return isDocDBSource(pipelineModel); - } - /** - * Evaluates model based on pre defined rules and - * result contains the name of the pipeline that will need transformation, - * evaluated boolean result and the corresponding template model - * Assumption: only one pipeline can have transformation. - * - * @param pipelinesModel - * @return RuleEvaluatorResult - */ - private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) { - PLUGIN_NAME = "documentdb"; - - Map pipelines = pipelinesModel.getPipelines(); + Map pipelines = pipelineModel.getPipelines(); for (Map.Entry entry : pipelines.entrySet()) { try { String pipelineJson = OBJECT_MAPPER.writeValueAsString(entry); - if (evaluate(pipelineJson, PLUGIN_NAME)) { - LOG.info("Rule for {} is evaluated true for pipelineJson {}", PLUGIN_NAME, pipelineJson); + RuleFileEvaluation ruleFileEvaluation = evaluate(pipelineJson); + + if (ruleFileEvaluation.getResult()) { + String pluginName = ruleFileEvaluation.getPluginName(); + LOG.info("Applying rule {}",ruleFileEvaluation.getRuleFileName().toString()); + LOG.info("Rule for {} is evaluated true for pipelineJson {}", pluginName, pipelineJson); - InputStream templateStream = transformersFactory.getPluginTemplateFileStream(PLUGIN_NAME); + InputStream templateStream = transformersFactory.getPluginTemplateFileStream(pluginName); PipelineTemplateModel templateModel = yamlMapper.readValue(templateStream, PipelineTemplateModel.class); - LOG.info("Template is chosen for {}", PLUGIN_NAME); + LOG.info("Template is chosen for {}", pluginName); return RuleEvaluatorResult.builder() .withEvaluatedResult(true) @@ -75,11 +64,7 @@ private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) .withPipelineName(entry.getKey()) .build(); } - } catch (FileNotFoundException e){ - LOG.error("Template File Not Found for {}", PLUGIN_NAME); - throw new RuntimeException(e); - } - catch (JsonProcessingException e) { + } catch (JsonProcessingException e) { LOG.error("Error processing json"); throw new RuntimeException(e); } catch (IOException e) { @@ -94,47 +79,68 @@ private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) .build(); } - private Boolean evaluate(String pipelinesJson, - String pluginName) { - + private RuleFileEvaluation evaluate(String pipelinesJson) { Configuration parseConfig = Configuration.builder() - .jsonProvider(new JacksonJsonProvider()) + .jsonProvider(new JacksonJsonNodeJsonProvider()) .mappingProvider(new JacksonMappingProvider()) - .options(Option.AS_PATH_LIST) + .options(Option.SUPPRESS_EXCEPTIONS) .build(); - ParseContext parseContext = JsonPath.using(parseConfig); - ReadContext readPathContext = parseContext.parse(pipelinesJson); RuleTransformerModel rulesModel = null; - InputStream ruleStream = null; + try { - ruleStream = transformersFactory.getPluginRuleFileStream(pluginName); + Collection ruleStreams = transformersFactory.loadRules(); - rulesModel = yamlMapper.readValue(ruleStream, RuleTransformerModel.class); - List rules = rulesModel.getApplyWhen(); - for (String rule : rules) { - try { - Object result = readPathContext.read(rule); - } catch (PathNotFoundException e) { - LOG.warn("Json Path not found for {}", pluginName); - return false; - } - } - } catch (FileNotFoundException e){ - LOG.warn("Rule File Not Found for {}", pluginName); - return false; - } catch(IOException e){ - throw new RuntimeException(e); - }finally { - if (ruleStream != null) { + //walk through all rules and return first valid + for (RuleStream ruleStream : ruleStreams) { try { + rulesModel = yamlMapper.readValue(ruleStream.getRuleStream(), RuleTransformerModel.class); + List rules = rulesModel.getApplyWhen(); + String pluginName = rulesModel.getPluginName(); + boolean allRulesValid = true; + + for (String rule : rules) { + try { + JsonNode result = JsonPath.using(parseConfig).parse(pipelinesJson).read(rule); + if (result == null || result.size() == 0) { + allRulesValid = false; + break; + } + } catch (PathNotFoundException e) { + LOG.debug("Json Path not found for {}", ruleStream.getName()); + allRulesValid = false; + break; + } + } + + if (allRulesValid) { + return RuleFileEvaluation.builder() + .withPluginName(pluginName) + .withRuleFileName(ruleStream.getName()) + .withResult(true) + .build(); + } + } finally { ruleStream.close(); - } catch (IOException e) { - throw new RuntimeException(e); } } + + } catch (FileNotFoundException e) { + LOG.debug("Rule File Not Found", e); + return RuleFileEvaluation.builder() + .withPluginName(null) + .withRuleFileName(null) + .withResult(false) + .build(); + } catch (IOException e) { + throw new RuntimeException(e); } - return true; + + return RuleFileEvaluation.builder() + .withPluginName(null) + .withRuleFileName(null) + .withResult(false) + .build(); } -} +} \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java new file mode 100644 index 0000000000..88ee0f8d6c --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleFileEvaluation.java @@ -0,0 +1,18 @@ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +@Builder(setterPrefix = "with") +@AllArgsConstructor +@Data +public class RuleFileEvaluation { + private Boolean result; + private String ruleFileName; + private String pluginName; + + public RuleFileEvaluation() { + + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleStream.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleStream.java new file mode 100644 index 0000000000..6d467787fa --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleStream.java @@ -0,0 +1,26 @@ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.IOException; +import java.io.InputStream; + +@Data +@AllArgsConstructor +public class RuleStream { + private String name; + private InputStream ruleStream; + + + public void close() { + if (ruleStream != null) { + try { + ruleStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} + diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java index 0ad9e45b72..aaf757cedb 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModel.java @@ -4,36 +4,31 @@ */ package org.opensearch.dataprepper.pipeline.parser.rule; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; import lombok.Data; import java.util.List; @Data +@AllArgsConstructor public class RuleTransformerModel { + @JsonInclude(JsonInclude.Include.NON_EMPTY) @JsonProperty("apply_when") private List applyWhen; - public RuleTransformerModel() { - } - - public RuleTransformerModel(List applyWhen) { - this.applyWhen = applyWhen; - } + @JsonProperty("plugin_name") + private String pluginName; - public List getApplyWhen() { - return applyWhen; - } - - public void setApplyWhen(List applyWhen) { - this.applyWhen = applyWhen; + public RuleTransformerModel() { } @Override public String toString() { return "RuleConfiguration{" + "applyWhen=" + applyWhen + - '}'; + "\npluginName="+ pluginName +'}'; } } \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/TemplateStream.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/TemplateStream.java new file mode 100644 index 0000000000..2fc238d796 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/TemplateStream.java @@ -0,0 +1,20 @@ +package org.opensearch.dataprepper.pipeline.parser.rule; + +import java.io.IOException; +import java.io.InputStream; + +public class TemplateStream { + private String name; + private InputStream templateStream; + + + public void close() { + if (templateStream != null) { + try { + templateStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java index 95741e9cf6..e903888a49 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java @@ -4,84 +4,116 @@ */ package org.opensearch.dataprepper.pipeline.parser.transformer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import static org.opensearch.dataprepper.pipeline.parser.PipelineTransformationConfiguration.RULES_DIRECTORY_PATH; -import static org.opensearch.dataprepper.pipeline.parser.PipelineTransformationConfiguration.TEMPLATES_DIRECTORY_PATH; - -import javax.inject.Named; -import java.io.File; -import java.io.FileNotFoundException; +import org.opensearch.dataprepper.pipeline.parser.rule.RuleStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.InputStream; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.FileSystem; +import java.nio.file.FileSystemNotFoundException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.List; +import java.util.stream.Stream; -public class TransformersFactory implements PipelineTransformationPathProvider { - - private static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory()); +public class TransformersFactory { + private static final Logger LOG = LoggerFactory.getLogger(TransformersFactory.class); + private static final String TEMPLATES_PATH = "org/opensearch/dataprepper/transforms/templates/"; + private static final String RULES_PATH = "org/opensearch/dataprepper/transforms/rules/"; private final String TEMPLATE_FILE_NAME_PATTERN = "-template.yaml"; private final String RULE_FILE_NAME_PATTERN = "-rule.yaml"; - private final String templatesDirectoryPath; - private final String rulesDirectoryPath; - - public TransformersFactory(@Named(RULES_DIRECTORY_PATH) final String rulesDirectoryPath, - @Named(TEMPLATES_DIRECTORY_PATH) final String templatesDirectoryPath) { - this.templatesDirectoryPath = templatesDirectoryPath; - this.rulesDirectoryPath = rulesDirectoryPath; - } - @Override - public String getTransformationTemplateDirectoryLocation() { - return templatesDirectoryPath; + public TransformersFactory(){ } - @Override - public String getTransformationRulesDirectoryLocation() { - return rulesDirectoryPath; - } - public String getPluginTemplateFileLocation(String pluginName) { - if(pluginName == null || pluginName.isEmpty()){ - throw new RuntimeException("Transformation plugin not found"); + public InputStream getPluginTemplateFileStream(String pluginName) { + if (pluginName == null || pluginName.isEmpty()) { + throw new RuntimeException("Transformation plugin not found"); } - return templatesDirectoryPath + "/" + pluginName + TEMPLATE_FILE_NAME_PATTERN; - } - public String getPluginRuleFileLocation(String pluginName) { - if(pluginName == null || pluginName.isEmpty()){ - throw new RuntimeException("Transformation plugin not found"); - } - return rulesDirectoryPath + "/" + pluginName + RULE_FILE_NAME_PATTERN; - } + // Construct the expected file name + String templateFileName = pluginName + TEMPLATE_FILE_NAME_PATTERN; - public InputStream getPluginRuleFileStream(String pluginName) { - if(pluginName == null || pluginName.isEmpty()){ - throw new RuntimeException("Transformation plugin not found"); + // Use the ClassLoader to find the template file on the classpath + ClassLoader classLoader = getClass().getClassLoader(); + URL templateURL = classLoader.getResource(TEMPLATES_PATH + templateFileName); + + if (templateURL == null) { + throw new RuntimeException("Template file not found for plugin: " + pluginName); } - ClassLoader classLoader = TransformersFactory.class.getClassLoader(); - InputStream filestream = classLoader.getResourceAsStream("rules" + "/" + pluginName + RULE_FILE_NAME_PATTERN); - return filestream; - } - public InputStream getPluginTemplateFileStream(String pluginName) { - if(pluginName == null || pluginName.isEmpty()){ - throw new RuntimeException("Transformation plugin not found"); + try { + // Convert the URL to a URI, then to a Path to read the file + Path templatePath; + try { + templatePath = Paths.get(templateURL.toURI()); + } catch (FileSystemNotFoundException e) { + // Handle the case where the file system is not accessible (e.g., in a JAR) + FileSystem fileSystem = FileSystems.newFileSystem(templateURL.toURI(), Collections.emptyMap()); + templatePath = fileSystem.getPath(TEMPLATES_PATH + templateFileName); + } + + // Return an InputStream for the found file + return Files.newInputStream(templatePath); + + } catch (IOException | URISyntaxException e) { + throw new RuntimeException("Failed to load template file for plugin: " + pluginName, e); } - ClassLoader classLoader = TransformersFactory.class.getClassLoader(); - InputStream filestream = classLoader.getResourceAsStream("templates" + "/" + pluginName + TEMPLATE_FILE_NAME_PATTERN); - return filestream; } - - public PipelineTemplateModel getTemplateModel(String pluginName) { - String templatePath = getPluginTemplateFileLocation(pluginName); + public Collection loadRules() { + List ruleStreams = new ArrayList<>(); + ClassLoader classLoader = getClass().getClassLoader(); try { - PipelineTemplateModel pipelineTemplateModel = YAML_MAPPER.readValue(new File(templatePath), PipelineTemplateModel.class); - return pipelineTemplateModel; - } catch (FileNotFoundException e) { - throw new RuntimeException(e); + // Use ClassLoader to find all resources that match the RULES_PATH pattern + Enumeration rulesURLs = classLoader.getResources(RULES_PATH); + + while (rulesURLs.hasMoreElements()) { + URL rulesURL = rulesURLs.nextElement(); + + try { + // Convert the URL to a URI, then to a Path to read the directory contents + Path rulesPath; + try { + rulesPath = Paths.get(rulesURL.toURI()); + } catch (FileSystemNotFoundException e) { + // Handle the case where the file system is not accessible (e.g., in a JAR) + FileSystem fileSystem = FileSystems.newFileSystem(rulesURL.toURI(), Collections.emptyMap()); + rulesPath = fileSystem.getPath(RULES_PATH); + } + + // Scan the directory for rule files + try (Stream paths = Files.walk(rulesPath)) { + paths.filter(Files::isRegularFile) + .forEach(rulePath -> { + try { + InputStream ruleInputStream = Files.newInputStream(rulePath); + ruleStreams.add(new RuleStream(rulePath.getFileName().toString(), ruleInputStream)); + } catch (IOException e) { + throw new RuntimeException("Failed to load rule: " + rulePath, e); + } + }); + } + } catch (IOException | URISyntaxException e) { + throw new RuntimeException("Failed to scan rules directory on classpath: " + rulesURL, e); + } + } } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException("Failed to load rules from classpath.", e); } + + return ruleStreams; } + } diff --git a/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml b/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml deleted file mode 100644 index 0e0e6d5325..0000000000 --- a/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml +++ /dev/null @@ -1,81 +0,0 @@ -"<>": - workers: "<<$.<>.workers>>" - delay: "<<$.<>.delay>>" - buffer: "<<$.<>.buffer>>" - source: - documentdb: "<<$.<>.source.documentdb>>" - routes: - - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' - - stream_load: 'getMetadata("ingestion_type") == "STREAM"' - sink: - - s3: - routes: - - initial_load - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" - bucket: "<<$.<>.source.documentdb.s3_bucket>>" - threshold: - event_collect_timeout: "120s" - maximum_size: "2mb" - aggregate_threshold: - maximum_size: "128mb" - flush_capacity_ratio: 0 - object_key: - path_prefix: "${getMetadata(\"s3_partition_key\")}" - codec: - event_json: - default_bucket_owner: "<>.source.documentdb.aws.sts_role_arn>>" - - s3: - routes: - - stream_load - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" - bucket: "<<$.<>.source.documentdb.s3_bucket>>" - threshold: - event_collect_timeout: "15s" - maximum_size: "1mb" - aggregate_threshold: - maximum_size: "128mb" - flush_capacity_ratio: 0 - object_key: - path_prefix: "${getMetadata(\"s3_partition_key\")}" - codec: - event_json: - default_bucket_owner: "<>.source.documentdb.aws.sts_role_arn>>" -"<>-s3": - workers: "<<$.<>.workers>>" - delay: "<<$.<>.delay>>" - buffer: "<<$.<>.buffer>>" - source: - s3: - codec: - event_json: - compression: "none" - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" - acknowledgments: true - delete_s3_objects_on_read: true - disable_s3_metadata_in_event: true - scan: - folder_partitions: - depth: "<>.source.documentdb.s3_prefix>>" - max_objects_per_ownership: 50 - buckets: - - bucket: - name: "<<$.<>.source.documentdb.s3_bucket>>" - filter: - include_prefix: ["<>.source.documentdb.s3_prefix>>"] - scheduling: - interval: "60s" - processor: "<<$.<>.processor>>" - sink: "<<$.<>.sink>>" - routes: "<<$.<>.routes>>" # In placeholder, routes or route (defined as alias) will be transformed to route in json as route will be primarily picked in pipelineModel. \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java index 361feb3558..a7bf0f5e6c 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java @@ -5,10 +5,10 @@ package org.opensearch.dataprepper.pipeline.parser.rule; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import org.opensearch.dataprepper.model.configuration.PipelineExtensions; @@ -20,9 +20,9 @@ import org.opensearch.dataprepper.pipeline.parser.transformer.TransformersFactory; import java.io.FileInputStream; -import java.io.FileNotFoundException; +import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; +import java.nio.file.Paths; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -31,18 +31,16 @@ class RuleEvaluatorTest { @Test - void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() throws FileNotFoundException { + void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() throws IOException { - // Set up String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String ruleDocDBTemplatePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCDB1_CONFIG_FILE; String pluginName = "documentdb"; String pipelineName = "test-pipeline"; - Map sourceOptions = new HashMap(); - Map s3_bucket = new HashMap<>(); - s3_bucket.put("s3_bucket", "bucket-name"); - List collections = new ArrayList(); - collections.add(s3_bucket); - sourceOptions.put("collections", collections); + Map sourceOptions = new HashMap<>(); + Map s3Bucket = new HashMap<>(); + s3Bucket.put("s3_bucket", "bucket-name"); + sourceOptions.put("s3_bucket", s3Bucket); final PluginModel source = new PluginModel(pluginName, sourceOptions); final List processors = Collections.singletonList(new PluginModel("testProcessor", null)); final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); @@ -51,54 +49,55 @@ void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() throws FileN final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel( (PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel)); - TransformersFactory transformersFactory = Mockito.spy(new TransformersFactory( - TestConfigurationProvider.RULES_TRANSFORMATION_DIRECTORY, - TestConfigurationProvider.TEMPLATES_SOURCE_TRANSFORMATION_DIRECTORY - )); - RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); - when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); + TransformersFactory transformersFactory = mock(TransformersFactory.class); +// TransformersFactory transformersFactory = spy(new TransformersFactory("", "")); InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); - when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + InputStream templateStream = new FileInputStream(ruleDocDBTemplatePath); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + + List ruleStreams = Collections.singletonList(ruleInputStream); + when(transformersFactory.loadRules()).thenReturn(ruleStreams); + when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); + + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); RuleEvaluatorResult result = ruleEvaluator.isTransformationNeeded(pipelinesDataFlowModel); - // Assert assertTrue(result.isEvaluatedResult()); assertEquals(result.getPipelineName(), pipelineName); } @Test - void test_isTransformationNeeded_ForOtherSource_ShouldReturn_False() { - // Set up + void test_isTransformationNeeded_ForOtherSource_ShouldReturn_False() throws IOException { + + String pluginName = "http"; String pipelineName = "test-pipeline"; - Map sourceOptions = new HashMap(); - sourceOptions.put("option1", "1"); - sourceOptions.put("option2", null); - final PluginModel source = new PluginModel("http", sourceOptions); + Map sourceOptions = new HashMap<>(); + Map s3Bucket = new HashMap<>(); + s3Bucket.put("s3_bucket", "bucket-name"); + sourceOptions.put("s3_bucket", s3Bucket); + final PluginModel source = new PluginModel(pluginName, sourceOptions); final List processors = Collections.singletonList(new PluginModel("testProcessor", null)); final List sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null)); final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50); + TransformersFactory transformersFactory = mock(TransformersFactory.class); + final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel( (PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel)); - TransformersFactory transformersFactory = Mockito.spy(new TransformersFactory( - TestConfigurationProvider.RULES_TRANSFORMATION_DIRECTORY, - TestConfigurationProvider.TEMPLATES_SOURCE_TRANSFORMATION_DIRECTORY - )); RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); RuleEvaluatorResult result = ruleEvaluator.isTransformationNeeded(pipelinesDataFlowModel); - // Assert - assertEquals(result.isEvaluatedResult(), false); + assertFalse(result.isEvaluatedResult()); } @Test void testThrowsExceptionOnFileError() { TransformersFactory transformersFactory = mock(TransformersFactory.class); String pipelineName = "test-pipeline"; - Map sourceOptions = new HashMap(); + Map sourceOptions = new HashMap<>(); sourceOptions.put("option1", "1"); sourceOptions.put("option2", null); final PluginModel source = new PluginModel("http", sourceOptions); @@ -109,16 +108,14 @@ void testThrowsExceptionOnFileError() { final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel( (PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel)); - // Setup mock to throw an exception when file path is incorrect - when(transformersFactory.getPluginRuleFileLocation("documentdb")).thenThrow(new RuntimeException("File not found")); - when(transformersFactory.getPluginRuleFileStream("documentdb")).thenThrow(new RuntimeException("File not found")); + when(transformersFactory.loadRules()).thenThrow(new RuntimeException("File not found")); RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); - // Execute and Assert Exception exception = assertThrows(RuntimeException.class, () -> { ruleEvaluator.isTransformationNeeded(pipelinesDataFlowModel); }); + assertEquals("File not found", exception.getMessage()); } } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java index 2cd6f0fad4..02a6370229 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleTransformerModelTest.java @@ -19,7 +19,8 @@ class RuleTransformerModelTest { @Test void testSerialization() throws Exception { List applyWhen = Arrays.asList("condition1", "condition2"); - RuleTransformerModel model = new RuleTransformerModel(applyWhen); + String pluginName = "testPlugin"; + RuleTransformerModel model = new RuleTransformerModel(applyWhen, pluginName); String json = objectMapper.writeValueAsString(model); assertNotNull(json, "Serialized JSON should not be null"); @@ -27,12 +28,13 @@ void testSerialization() throws Exception { @Test void testDeserialization() throws Exception { - String json = "{\"apply_when\":[\"condition1\",\"condition2\"]}"; + String json = "{\"plugin_name\": \"testPlugin\", \"apply_when\": [\"condition1\", \"condition2\"]}"; RuleTransformerModel model = objectMapper.readValue(json, RuleTransformerModel.class); assertNotNull(model, "Deserialized model should not be null"); assertEquals(2, model.getApplyWhen().size(), "ApplyWhen should contain two conditions"); assertEquals("condition1", model.getApplyWhen().get(0), "The first condition should be 'condition1'"); assertEquals("condition2", model.getApplyWhen().get(1), "The second condition should be 'condition2'"); + assertEquals("testPlugin", model.getPluginName(), "plugin Name"); } } diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java index c5e0aea18d..a1f417054c 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java @@ -11,7 +11,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader; @@ -19,26 +19,28 @@ import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser; import org.opensearch.dataprepper.pipeline.parser.TestConfigurationProvider; import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator; +import org.opensearch.dataprepper.pipeline.parser.rule.RuleStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; import java.util.Map; class DynamicConfigTransformerTest { private final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory() .disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER)); - private final String RULES_DIRECTORY_PATH = "src/test/resources/transformation/rules"; - private final String TEMPLATES_DIRECTORY_PATH = "src/test/resources/transformation/templates/testSource"; TransformersFactory transformersFactory; RuleEvaluator ruleEvaluator; @Test void test_successful_transformation_with_only_source_and_sink() throws IOException { - String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCDB1_CONFIG_FILE; String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCDB1_CONFIG_FILE; String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; @@ -48,15 +50,15 @@ void test_successful_transformation_with_only_source_and_sink() throws IOExcepti final PipelinesDataflowModelParser pipelinesDataflowModelParser = new PipelinesDataflowModelParser(pipelineConfigurationReader); - transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, - TEMPLATES_DIRECTORY_PATH)); - when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); - when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + TransformersFactory transformersFactory = mock(TransformersFactory.class); InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + List ruleStreams = Collections.singletonList(ruleInputStream); + when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); - ruleEvaluator = new RuleEvaluator(transformersFactory); + + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); @@ -74,20 +76,21 @@ void test_successful_transformation_with_documentdb() throws IOException { String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; - String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; String expectedDocDBFilePath = TestConfigurationProvider.EXPECTED_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; String pluginName = "documentdb"; PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); final PipelinesDataflowModelParser pipelinesDataflowModelParser = new PipelinesDataflowModelParser(pipelineConfigurationReader); - transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, - TEMPLATES_DIRECTORY_PATH)); - when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); - when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + TransformersFactory transformersFactory = mock(TransformersFactory.class); + Path ruleFile = mock(Path.class); + when(ruleFile.getFileName()).thenReturn(Paths.get(ruleDocDBFilePath).getFileName()); InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + List ruleStreams = Collections.singletonList(ruleInputStream); + when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); ruleEvaluator = new RuleEvaluator(transformersFactory); @@ -103,28 +106,31 @@ void test_successful_transformation_with_documentdb() throws IOException { } @RepeatedTest(5) - @Test void test_successful_transformation_with_subpipelines() throws IOException { String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_CONFIG_FILE; String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_CONFIG_FILE; - String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; + String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; String expectedDocDBFilePath = TestConfigurationProvider.EXPECTED_TRANSFORMATION_DOCUMENTDB_SUBPIPLINES_CONFIG_FILE; String pluginName = "documentdb"; PipelineConfigurationReader pipelineConfigurationReader = new PipelineConfigurationFileReader(docDBUserConfig); final PipelinesDataflowModelParser pipelinesDataflowModelParser = new PipelinesDataflowModelParser(pipelineConfigurationReader); - transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, - TEMPLATES_DIRECTORY_PATH)); - when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); - when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + TransformersFactory transformersFactory = mock(TransformersFactory.class); + InputStream ruleStream1 = new FileInputStream(ruleDocDBFilePath); InputStream ruleStream2 = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream1).thenReturn(ruleStream2); + RuleStream ruleInputStream1 = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream1); + RuleStream ruleInputStream2 = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream2); + + List ruleStreams1 = Collections.singletonList(ruleInputStream1); + List ruleStreams2 = Collections.singletonList(ruleInputStream2); + when(transformersFactory.loadRules()).thenReturn(ruleStreams1).thenReturn(ruleStreams2); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); - ruleEvaluator = new RuleEvaluator(transformersFactory); + + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); @@ -139,7 +145,6 @@ void test_successful_transformation_with_subpipelines() throws IOException { @Test void test_successful_transformation_with_functionPlaceholder() throws IOException { - String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE; String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_FUNCTION_CONFIG_FILE; String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; @@ -149,15 +154,17 @@ void test_successful_transformation_with_functionPlaceholder() throws IOExceptio final PipelinesDataflowModelParser pipelinesDataflowModelParser = new PipelinesDataflowModelParser(pipelineConfigurationReader); - transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, - TEMPLATES_DIRECTORY_PATH)); - when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); - when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + TransformersFactory transformersFactory = mock(TransformersFactory.class); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + + List ruleStreams = Collections.singletonList(ruleInputStream); + when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); - ruleEvaluator = new RuleEvaluator(transformersFactory); + + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); @@ -170,10 +177,8 @@ void test_successful_transformation_with_functionPlaceholder() throws IOExceptio assertThat(expectedYamlasMap).usingRecursiveComparison().isEqualTo(transformedYamlasMap); } - @Test void test_successful_transformation_with_complete_template() throws IOException { - String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCDB2_CONFIG_FILE; String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCDB2_CONFIG_FILE; String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; @@ -183,15 +188,17 @@ void test_successful_transformation_with_complete_template() throws IOException final PipelinesDataflowModelParser pipelinesDataflowModelParser = new PipelinesDataflowModelParser(pipelineConfigurationReader); - transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, - TEMPLATES_DIRECTORY_PATH)); - when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); - when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + TransformersFactory transformersFactory = mock(TransformersFactory.class); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + + List ruleStreams = Collections.singletonList(ruleInputStream); + when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); - ruleEvaluator = new RuleEvaluator(transformersFactory); + + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); @@ -205,9 +212,9 @@ void test_successful_transformation_with_complete_template() throws IOException assertThat(expectedYamlasMap).usingRecursiveComparison().isEqualTo(transformedYamlasMap); } + @Test void test_successful_transformation_with_routes_keyword() throws IOException { - String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_ROUTES_CONFIG_FILE; String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_FINAL_CONFIG_FILE; String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; @@ -217,15 +224,17 @@ void test_successful_transformation_with_routes_keyword() throws IOException { final PipelinesDataflowModelParser pipelinesDataflowModelParser = new PipelinesDataflowModelParser(pipelineConfigurationReader); - transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, - TEMPLATES_DIRECTORY_PATH)); - when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); - when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + TransformersFactory transformersFactory = mock(TransformersFactory.class); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + + List ruleStreams = Collections.singletonList(ruleInputStream); + when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); - ruleEvaluator = new RuleEvaluator(transformersFactory); + + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); @@ -239,11 +248,8 @@ void test_successful_transformation_with_routes_keyword() throws IOException { assertThat(expectedYamlasMap).usingRecursiveComparison().isEqualTo(transformedYamlasMap); } - - @Test void test_successful_transformation_with_route_keyword() throws IOException { - String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_ROUTE_CONFIG_FILE; String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_FINAL_CONFIG_FILE; String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; @@ -254,15 +260,17 @@ void test_successful_transformation_with_route_keyword() throws IOException { final PipelinesDataflowModelParser pipelinesDataflowModelParser = new PipelinesDataflowModelParser(pipelineConfigurationReader); - transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, - TEMPLATES_DIRECTORY_PATH)); - when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); - when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + TransformersFactory transformersFactory = mock(TransformersFactory.class); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + + List ruleStreams = Collections.singletonList(ruleInputStream); + when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); - ruleEvaluator = new RuleEvaluator(transformersFactory); + + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); @@ -276,10 +284,8 @@ void test_successful_transformation_with_route_keyword() throws IOException { assertThat(expectedYamlasMap).usingRecursiveComparison().isEqualTo(transformedYamlasMap); } - @Test void test_successful_transformation_with_routes_and_subpipelines() throws IOException { - String docDBUserConfig = TestConfigurationProvider.USER_CONFIG_TRANSFORMATION_DOCUMENTDB_SUBPIPELINES_ROUTES_CONFIG_FILE; String templateDocDBFilePath = TestConfigurationProvider.TEMPLATE_TRANSFORMATION_DOCUMENTDB_FINAL_CONFIG_FILE; String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCDB1_CONFIG_FILE; @@ -289,18 +295,24 @@ void test_successful_transformation_with_routes_and_subpipelines() throws IOExce final PipelinesDataflowModelParser pipelinesDataflowModelParser = new PipelinesDataflowModelParser(pipelineConfigurationReader); - transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, - TEMPLATES_DIRECTORY_PATH)); - when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); - when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); - InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); + TransformersFactory transformersFactory = mock(TransformersFactory.class); + + InputStream ruleStream1 = new FileInputStream(ruleDocDBFilePath); InputStream ruleStream2 = new FileInputStream(ruleDocDBFilePath); InputStream ruleStream3 = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); + RuleStream ruleInputStream1 = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream1); + RuleStream ruleInputStream2 = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream2); + RuleStream ruleInputStream3 = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream3); + + List ruleStreams1 = Collections.singletonList(ruleInputStream1); + List ruleStreams2 = Collections.singletonList(ruleInputStream2); + List ruleStreams3 = Collections.singletonList(ruleInputStream3); - when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream).thenReturn(ruleStream2).thenReturn(ruleStream3); + when(transformersFactory.loadRules()).thenReturn(ruleStreams1).thenReturn(ruleStreams2).thenReturn(ruleStreams3); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); - ruleEvaluator = new RuleEvaluator(transformersFactory); + + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory PipelinesDataFlowModel pipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); @@ -324,14 +336,16 @@ void testInvalidJsonPathThrowsException() throws IOException { final PipelinesDataflowModelParser pipelinesDataflowModelParser = new PipelinesDataflowModelParser(pipelineConfigurationReader); - transformersFactory = Mockito.spy(new TransformersFactory(RULES_DIRECTORY_PATH, - TEMPLATES_DIRECTORY_PATH)); - when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); - when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + TransformersFactory transformersFactory = mock(TransformersFactory.class); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); InputStream templateStream = new FileInputStream(templateDocDBFilePath); - when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + RuleStream ruleInputStream = new RuleStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream); + + List ruleStreams = Collections.singletonList(ruleInputStream); + when(transformersFactory.loadRules()).thenReturn(ruleStreams); when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); + ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java index 4b3b8df0e4..ff5b6f21c6 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactoryTest.java @@ -1,76 +1,92 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ package org.opensearch.dataprepper.pipeline.parser.transformer; -import com.fasterxml.jackson.databind.ObjectMapper; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; +import org.opensearch.dataprepper.pipeline.parser.rule.RuleStream; -import java.io.File; -import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; -public class TransformersFactoryTest { +class TransformersFactoryTest { private TransformersFactory transformersFactory; - private final String templatesDirectoryPath = "src/test/resources/templates"; - private final String rulesDirectoryPath = "src/test/resources/rules"; - private final String validPluginName = "testPlugin"; - private final String invalidPluginName = ""; @BeforeEach - public void setUp() { - transformersFactory = new TransformersFactory(rulesDirectoryPath, templatesDirectoryPath); + void setUp() { + transformersFactory = new TransformersFactory(); } @Test - public void testGetPluginTemplateFileLocation_validPluginName() { - String expectedPath = templatesDirectoryPath + "/" + validPluginName + "-template.yaml"; - assertEquals(expectedPath, transformersFactory.getPluginTemplateFileLocation(validPluginName)); + void testGetPluginTemplateFileStream_whenTemplateExists_shouldReturnInputStream() throws Exception { + String pluginName = "test-plugin"; + + // Load the actual resource + InputStream inputStream = transformersFactory.getPluginTemplateFileStream(pluginName); + + assertNotNull(inputStream); + inputStream.close(); } @Test - public void testGetPluginTemplateFileLocation_invalidPluginName() { + void testGetPluginTemplateFileStream_whenTemplateDoesNotExist_shouldThrowException() { + String pluginName = "non-existent-plugin"; + Exception exception = assertThrows(RuntimeException.class, () -> { - transformersFactory.getPluginTemplateFileLocation(invalidPluginName); + transformersFactory.getPluginTemplateFileStream(pluginName); }); - assertEquals("Transformation plugin not found", exception.getMessage()); + + assertEquals("Template file not found for plugin: " + pluginName, exception.getMessage()); + } + + @Test + void testLoadRules_whenRulesExist_shouldReturnRuleStreams() throws Exception { + Collection ruleStreams = transformersFactory.loadRules(); + + assertNotNull(ruleStreams); + assertFalse(ruleStreams.isEmpty()); + + for (RuleStream ruleStream : ruleStreams) { + assertNotNull(ruleStream.getRuleStream()); + assertNotNull(ruleStream.getName()); + } } @Test - public void testGetPluginRuleFileLocation_validPluginName() { - String expectedPath = rulesDirectoryPath + "/" + validPluginName + "-rule.yaml"; - assertEquals(expectedPath, transformersFactory.getPluginRuleFileLocation(validPluginName)); + void testLoadRules_whenFilesExist_shouldReturnRuleStreams() throws Exception { + // Ensure the rules directory has at least one file + Collection ruleStreams = transformersFactory.loadRules(); + + assertNotNull(ruleStreams); + assertFalse(ruleStreams.isEmpty()); + + for (RuleStream ruleStream : ruleStreams) { + assertNotNull(ruleStream.getRuleStream()); + assertNotNull(ruleStream.getName()); + assertTrue(ruleStream.getName().endsWith("-rule.yaml")); + } } @Test - public void testGetPluginRuleFileLocation_invalidPluginName() { + void testGetPluginTemplateFileStream_whenPluginNameIsNull_shouldThrowException() { Exception exception = assertThrows(RuntimeException.class, () -> { - transformersFactory.getPluginRuleFileLocation(invalidPluginName); + transformersFactory.getPluginTemplateFileStream(null); }); + assertEquals("Transformation plugin not found", exception.getMessage()); } @Test - public void testGetTemplateModel_throwsRuntimeExceptionOnIOException() throws IOException { - ObjectMapper mockedYamlMapper = Mockito.mock(ObjectMapper.class); - String templatePath = templatesDirectoryPath + "/" + validPluginName + "-template.yaml"; - File expectedFile = new File(templatePath); - - Mockito.when(mockedYamlMapper.readValue(Mockito.eq(expectedFile), Mockito.eq(PipelineTemplateModel.class))) - .thenThrow(new IOException("Test exception")); + void testGetPluginTemplateFileStream_whenPluginNameIsEmpty_shouldThrowException() { + Exception exception = assertThrows(RuntimeException.class, () -> { + transformersFactory.getPluginTemplateFileStream(""); + }); - assertThrows(RuntimeException.class, () -> transformersFactory.getTemplateModel(validPluginName)); + assertEquals("Transformation plugin not found", exception.getMessage()); } - @Test - public void testGetTemplateModel_invalidPluginNameThrowsRuntimeException() { - assertThrows(RuntimeException.class, () -> transformersFactory.getTemplateModel(invalidPluginName), - "Should throw a RuntimeException for empty plugin name."); - } } - diff --git a/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/rules/test-plugin-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/rules/test-plugin-rule.yaml new file mode 100644 index 0000000000..bdee2aaacf --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/rules/test-plugin-rule.yaml @@ -0,0 +1,3 @@ +plugin_name: "test-plugin" +apply_when: + - "$..source.documentdb" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/templates/test-plugin-template.yaml b/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/templates/test-plugin-template.yaml new file mode 100644 index 0000000000..e9e95b4ff0 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/resources/org/opensearch/dataprepper/transforms/templates/test-plugin-template.yaml @@ -0,0 +1,4 @@ +"<>-transformed": + source: "<<$.<>.source>>" + sink: + - noop: diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml index 28da1406f8..b120d1531c 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb-rule.yaml @@ -1,3 +1,4 @@ +plugin_name: "documentdb" apply_when: - "$..source.documentdb" - - "$..source.documentdb.collections[0].s3_bucket" \ No newline at end of file + - "$..source.documentdb.s3_bucket" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml index cb10684065..ed3c4b8b57 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/rules/documentdb1-rule.yaml @@ -1,2 +1,3 @@ +plugin_name: "documentdb" apply_when: - "$..source.documentdb" \ No newline at end of file diff --git a/data-prepper-plugins/dummy-plugin/README.md b/data-prepper-plugins/dummy-plugin/README.md new file mode 100644 index 0000000000..b741ffe5ba --- /dev/null +++ b/data-prepper-plugins/dummy-plugin/README.md @@ -0,0 +1,63 @@ +A dummy plugin is used specifically when we want to use pipeline transformation for a +plugin that does not exist. + +We can define a rule and template for this plugin by creating a plugin folder and +place rule for which the pipeline configuration would be valid and a corresponding +template to transform to when the rule is valid. + +For further details on transformation refer: +/docs/pipeline_configuration_transformation.md + + +For Example: + +User Config: +```yaml +test-pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - dummy_plugin: + sink: + - stdout: +``` + +Here dummy_plugin is not really a plugin that is defined in dataprepper, but we can +use the pipeline transformation to convert the user-config to different config based +on the template. + +Rule: +```yaml +plugin_name: "dummy-plugin" +apply_when: + - "$.test-pipeline.processor[?(@.dummy_plugin)]" +``` + +Template: +```yaml +test-pipeline: + source: + file: + path: "/tmp/input-file.log" + processor: + - string_converter: + upper_case: true + sink: + - noop: +``` + +Output: +```yaml +test-pipeline: + source: + file: + path: "/tmp/input-file.log" + processor: + - string_converter: + upper_case: true + sink: + - noop: +``` \ No newline at end of file diff --git a/data-prepper-plugins/dummy-plugin/build.gradle b/data-prepper-plugins/dummy-plugin/build.gradle new file mode 100644 index 0000000000..e69de29bb2 diff --git a/data-prepper-plugins/dummy-plugin/src/main/resources/org/opensearch/dataprepper/transforms/rules/dummy-plugin-rule.yaml b/data-prepper-plugins/dummy-plugin/src/main/resources/org/opensearch/dataprepper/transforms/rules/dummy-plugin-rule.yaml new file mode 100644 index 0000000000..6c26f86b2a --- /dev/null +++ b/data-prepper-plugins/dummy-plugin/src/main/resources/org/opensearch/dataprepper/transforms/rules/dummy-plugin-rule.yaml @@ -0,0 +1,3 @@ +plugin_name: "dummy-plugin" +apply_when: + - "$.test-pipeline.processor[?(@.dummy_plugin)]" diff --git a/data-prepper-plugins/dummy-plugin/src/main/resources/org/opensearch/dataprepper/transforms/templates/dummy-plugin-template.yaml b/data-prepper-plugins/dummy-plugin/src/main/resources/org/opensearch/dataprepper/transforms/templates/dummy-plugin-template.yaml new file mode 100644 index 0000000000..bbf00a285e --- /dev/null +++ b/data-prepper-plugins/dummy-plugin/src/main/resources/org/opensearch/dataprepper/transforms/templates/dummy-plugin-template.yaml @@ -0,0 +1,9 @@ +test-pipeline: + source: + file: + path: "/tmp/input-file.log" + processor: + - string_converter: + upper_case: true + sink: + - noop: \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/main/resources/rules/documentdb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml similarity index 74% rename from data-prepper-pipeline-parser/src/main/resources/rules/documentdb-rule.yaml rename to data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml index e7279ee733..60aa428d8a 100644 --- a/data-prepper-pipeline-parser/src/main/resources/rules/documentdb-rule.yaml +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/documentdb-rule.yaml @@ -1,3 +1,4 @@ +plugin_name: "documentdb" apply_when: - "$..source.documentdb" - "$..source.documentdb.s3_bucket" \ No newline at end of file diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/documentdb-template.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/documentdb-template.yaml new file mode 100644 index 0000000000..38bb70d8ca --- /dev/null +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/documentdb-template.yaml @@ -0,0 +1,81 @@ +"<>": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: + documentdb: "<<$.<>.source.documentdb>>" + routes: + - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' + - stream_load: 'getMetadata("ingestion_type") == "STREAM"' + sink: + - s3: + routes: + - initial_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + event_json: + default_bucket_owner: "<>.source.documentdb.aws.sts_role_arn>>" + - s3: + routes: + - stream_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "15s" + maximum_size: "1mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + event_json: + default_bucket_owner: "<>.source.documentdb.aws.sts_role_arn>>" +"<>-s3": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: + s3: + codec: + event_json: + compression: "none" + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" + acknowledgments: true + delete_s3_objects_on_read: true + disable_s3_metadata_in_event: true + scan: + folder_partitions: + depth: "<>.source.documentdb.s3_prefix>>" + max_objects_per_ownership: 50 + buckets: + - bucket: + name: "<<$.<>.source.documentdb.s3_bucket>>" + filter: + include_prefix: ["<>.source.documentdb.s3_prefix>>"] + scheduling: + interval: "60s" + processor: "<<$.<>.processor>>" + sink: "<<$.<>.sink>>" + routes: "<<$.<>.routes>>" # In placeholder, routes or route (defined as alias) will be transformed to route in json as route will be primarily picked in pipelineModel. \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 63e7ad2a9f..20edb79287 100644 --- a/settings.gradle +++ b/settings.gradle @@ -180,4 +180,5 @@ include 'data-prepper-plugins:mongodb' include 'data-prepper-plugins:rds-source' include 'data-prepper-plugins:http-source-common' include 'data-prepper-plugins:http-common' -include 'data-prepper-plugins:aws-lambda' \ No newline at end of file +include 'data-prepper-plugins:aws-lambda' +//include 'data-prepper-plugins:dummy-plugin' \ No newline at end of file