diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java index 6650c6b9ee..9cf83b11d7 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/PipelineTransformationConfiguration.java @@ -10,30 +10,38 @@ import org.springframework.context.annotation.Configuration; import javax.inject.Named; +import java.nio.file.Path; +import java.nio.file.Paths; @Configuration public class PipelineTransformationConfiguration { public static final String TEMPLATES_DIRECTORY_PATH = "TEMPLATES_DIRECTORY_PATH"; - public static final String RULES_DIRECTORY_PATH = "VALIDATORS_DIRECTORY_PATH"; + public static final String RULES_DIRECTORY_PATH = "RULES_DIRECTORY_PATH"; + private static final Path currentDir = Paths.get(System.getProperty("user.dir")); +// private static final String parserRelativePath = "/data-prepper-pipeline-parser/src"; @Bean @Named(RULES_DIRECTORY_PATH) static String provideRulesDirectoryPath() { - return "resources/rules"; + ClassLoader classLoader = PipelineTransformationConfiguration.class.getClassLoader(); + String filePath = classLoader.getResource("rules").getFile(); + return filePath; } @Bean @Named(TEMPLATES_DIRECTORY_PATH) static String provideTemplateDirectoryPath() { - return "resources/templates"; + ClassLoader classLoader = PipelineTransformationConfiguration.class.getClassLoader(); + String filePath = classLoader.getResource("templates").getFile(); + return filePath; } @Bean TransformersFactory transformersFactory( - @Named(TEMPLATES_DIRECTORY_PATH) String provideTransformerDirectoryPath, - @Named(RULES_DIRECTORY_PATH) String provideTemplateDirectoryPath + @Named(RULES_DIRECTORY_PATH) String rulesDirectoryPath, + @Named(TEMPLATES_DIRECTORY_PATH) String templatesDirectoryPath ) { - return new TransformersFactory(RULES_DIRECTORY_PATH, TEMPLATES_DIRECTORY_PATH); + return new TransformersFactory(rulesDirectoryPath, templatesDirectoryPath); } @Bean diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java index 5971fe17e3..a94b8a769f 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluator.java @@ -22,8 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.Map; @@ -53,19 +53,25 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin */ private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) { PLUGIN_NAME = "documentdb"; - String pluginRulesPath = transformersFactory.getPluginRuleFileLocation(PLUGIN_NAME); +// String pluginRulesPath = transformersFactory.getPluginRuleFileLocation(PLUGIN_NAME); +// File ruleFile = new File(pluginRulesPath); +// LOG.info("Checking rule path {}",ruleFile.getAbsolutePath()); + Map pipelines = pipelinesModel.getPipelines(); for (Map.Entry entry : pipelines.entrySet()) { try { String pipelineJson = OBJECT_MAPPER.writeValueAsString(entry); - if (evaluate(pipelineJson, pluginRulesPath)) { - LOG.debug("Rule path {} is evaluated true for pipelineJson {}",pluginRulesPath, pipelineJson); + if (evaluate(pipelineJson, PLUGIN_NAME)) { + LOG.info("Rule for {} is evaluated true for pipelineJson {}", PLUGIN_NAME, pipelineJson); - String templateFilePath = transformersFactory.getPluginTemplateFileLocation(PLUGIN_NAME); - PipelineTemplateModel templateModel = yamlMapper.readValue(new File(templateFilePath), +// String templateFilePathString = transformersFactory.getPluginTemplateFileLocation(PLUGIN_NAME); +// File templateFile = new File(templateFilePathString); +// LOG.info("Absolute path of template file: {}",templateFile.getAbsolutePath()); + InputStream templateStream = transformersFactory.getPluginTemplateFileStream(PLUGIN_NAME); + PipelineTemplateModel templateModel = yamlMapper.readValue(templateStream, PipelineTemplateModel.class); - LOG.debug("Chosen template file {}",templateFilePath); + LOG.info("Template is chosen for {}", PLUGIN_NAME); return RuleEvaluatorResult.builder() .withEvaluatedResult(true) @@ -89,7 +95,7 @@ private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) } private Boolean evaluate(String pipelinesJson, - String rulePath) { + String pluginName) { Configuration parseConfig = Configuration.builder() .jsonProvider(new JacksonJsonProvider()) @@ -100,16 +106,20 @@ private Boolean evaluate(String pipelinesJson, ReadContext readPathContext = parseContext.parse(pipelinesJson); try { - RuleTransformerModel rulesModel = yamlMapper.readValue(new File(rulePath), RuleTransformerModel.class); + //TODO +// ClassLoader classLoader = RuleEvaluator.class.getClassLoader(); +// InputStream filestream = classLoader.getResourceAsStream("rules/documentdb-rule.yaml"); + InputStream ruleStream = transformersFactory.getPluginRuleFileStream(pluginName); + RuleTransformerModel rulesModel = yamlMapper.readValue(ruleStream, RuleTransformerModel.class); List rules = rulesModel.getApplyWhen(); for (String rule : rules) { Object result = readPathContext.read(rule); } } catch (IOException e) { - LOG.warn("Error reading file {}", rulePath); + LOG.warn("Error reading {} rule",pluginName); return false; } catch (PathNotFoundException e) { - LOG.warn("Path not found {}", rulePath); + LOG.warn("Json Path not found for {}", pluginName); return false; } return true; diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java index 6bb796b9a9..d2b1b9dc45 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; @@ -68,6 +69,7 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme private static final String JSON_PATH_ARRAY_DISAMBIGUATOR_PATTERN = "[?(@."; private static final String RECURSIVE_JSON_PATH_PATH = "$.."; private static final String JSON_PATH_IDENTIFIER = "$."; + private static final String ARRAY_NODE_PATTERN = "([^\\[]+)\\[(\\d+)\\]$"; private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER"; @@ -102,12 +104,14 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT if (!ruleEvaluatorResult.isEvaluatedResult() || ruleEvaluatorResult.getPipelineName() == null) { + LOG.info("No transformation needed"); return preTransformedPipelinesDataFlowModel; } //To differentiate between sub-pipelines that dont need transformation. String pipelineNameThatNeedsTransformation = ruleEvaluatorResult.getPipelineName(); PipelineTemplateModel templateModel = ruleEvaluatorResult.getPipelineTemplateModel(); + LOG.info("Transforming pipeline config for pipeline {}",pipelineNameThatNeedsTransformation); try { Map pipelines = preTransformedPipelinesDataFlowModel.getPipelines(); @@ -121,6 +125,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT //Replace pipeline name placeholder with pipelineNameThatNeedsTransformation String templateJsonString = replaceTemplatePipelineName(templateJsonStringWithPipelinePlaceholder, pipelineNameThatNeedsTransformation); + LOG.info("Template - {}",templateJsonString); // Find all PLACEHOLDER_PATTERN in template json string Map> placeholdersMap = findPlaceholdersWithPathsRecursively(templateJsonString); @@ -169,19 +174,18 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT * @throws JsonProcessingException */ private PipelinesDataFlowModel getTransformedPipelinesDataFlowModel(String pipelineNameThatNeedsTransformation, PipelinesDataFlowModel preTransformedPipelinesDataFlowModel, JsonNode templateRootNode) throws JsonProcessingException { + //update template json - String transformedJson = objectMapper.writeValueAsString(templateRootNode); - LOG.debug("{} pipeline has been transformed to :{}", pipelineNameThatNeedsTransformation, transformedJson); + JsonNode transformedJsonNode = templateRootNode.get(TEMPLATE_PIPELINE_ROOT_STRING); + String transformedJson = objectMapper.writeValueAsString(transformedJsonNode); + LOG.info("{} pipeline has been transformed to :{}", pipelineNameThatNeedsTransformation, transformedJson); //convert TransformedJson to PipelineModel with the data from preTransformedDataFlowModel. //transform transformedJson to Map - Map transformedConfigMap = objectMapper.readValue(transformedJson, Map.class); + PipelinesDataFlowModel transformedSinglePipelineDataFlowModel = objectMapper.readValue(transformedJson, PipelinesDataFlowModel.class); + Map transformedPipelines = transformedSinglePipelineDataFlowModel.getPipelines(); Map pipelines = preTransformedPipelinesDataFlowModel.getPipelines(); - - // get the root of the Transformed Pipeline Model, to get the actual pipelines. - // direct conversion to PipelineDataModel throws exception. - Map transformedPipelines = (Map) transformedConfigMap.get(TEMPLATE_PIPELINE_ROOT_STRING); pipelines.forEach((pipelineName, pipeline) -> { if (!pipelineName.equals(pipelineNameThatNeedsTransformation)) { transformedPipelines.put(pipelineName, pipeline); @@ -194,7 +198,7 @@ private PipelinesDataFlowModel getTransformedPipelinesDataFlowModel(String pipel transformedPipelines ); String transformedPipelinesDataFlowModelJson = objectMapper.writeValueAsString(transformedPipelinesDataFlowModel); - LOG.debug("Transformed PipelinesDataFlowModel: {}", transformedPipelinesDataFlowModelJson); + LOG.info("Transformed PipelinesDataFlowModel: {}", transformedPipelinesDataFlowModelJson); return transformedPipelinesDataFlowModel; } @@ -311,6 +315,9 @@ private String executeFunctionPlaceholder(String functionPlaceholderValue, Strin private Object parseParameter(String parameter, String pipelineJson) { if(isJsonPath(parameter)){ JsonNode pipelineNode = JsonPath.using(parseConfigWithJsonNode).parse(pipelineJson).read(parameter); + if(pipelineNode==null){ + return null; + } if(!pipelineNode.isValueNode()){ throw new RuntimeException("parameter has to be a value node"); } @@ -343,11 +350,17 @@ private boolean isJsonPath(String parameter) { * @return */ public String calculateDepth(String s3Prefix) { + if(s3Prefix == null){ + return Integer.toString(3); + } return Integer.toString(s3Prefix.split("/").length + 3); } public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){ String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE); + if(s3Prefix == null){ + return envSourceCoordinationIdentifier; + } return s3Prefix+"/"+envSourceCoordinationIdentifier; } @@ -383,18 +396,32 @@ public Object invokeMethod(String methodName, Class parameterType, Object arg public void replaceNode(JsonNode root, String jsonPath, JsonNode newNode) { try { if (newNode == null) { -// throw new PathNotFoundException(format("jsonPath {} not found", jsonPath)); - LOG.debug("Did not find jsonPath {}",jsonPath); + LOG.info("Did not find jsonPath {}",jsonPath); } // Read the parent path of the target node String parentPath = jsonPath.substring(0, jsonPath.lastIndexOf('.')); String fieldName = jsonPath.substring(jsonPath.lastIndexOf('.') + 1); + //Handle if fieldName is an array + Pattern pattern = Pattern.compile(ARRAY_NODE_PATTERN); + Matcher matcher = pattern.matcher(fieldName); + // Find the parent node JsonNode parentNode = JsonPath.using(parseConfigWithJsonNode).parse(root).read(parentPath); // Replace the target field in the parent node - if (parentNode != null && parentNode instanceof ObjectNode) { + if(matcher.find()){ + //Handle array + String field = matcher.group(1); + int index = Integer.parseInt(matcher.group(2)); + JsonNode arrayNodeResult = JsonPath.using(parseConfigWithJsonNode).parse(root).read(parentPath+"."+field); + if (!(arrayNodeResult instanceof ArrayNode)){ + throw new RuntimeException("Json path is of array type, but parsed result is not arrayNode"); + } + ArrayNode arrayNode = (ArrayNode) arrayNodeResult; + // Replace the element in the array + arrayNode.set(index, newNode); + } else if (parentNode != null && parentNode instanceof ObjectNode) { ((ObjectNode) parentNode).replace(fieldName, newNode); } else { LOG.error("Path does not point to object node"); diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java index ec09e90fe7..74c7432a41 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/TransformersFactory.java @@ -13,6 +13,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; public class TransformersFactory implements PipelineTransformationPathProvider { @@ -22,7 +23,6 @@ public class TransformersFactory implements PipelineTransformationPathProvider { private final String RULE_FILE_NAME_PATTERN = "-rule.yaml"; private final String templatesDirectoryPath; private final String rulesDirectoryPath; - String PLUGIN_NAME = null; public TransformersFactory(@Named(RULES_DIRECTORY_PATH) final String rulesDirectoryPath, @Named(TEMPLATES_DIRECTORY_PATH) final String templatesDirectoryPath) { @@ -54,6 +54,24 @@ public String getPluginRuleFileLocation(String pluginName) { return rulesDirectoryPath + "/" + pluginName + RULE_FILE_NAME_PATTERN; } + public InputStream getPluginRuleFileStream(String pluginName) { + if(pluginName.isEmpty()){ + throw new RuntimeException("Transformation plugin not found"); + } + ClassLoader classLoader = TransformersFactory.class.getClassLoader(); + InputStream filestream = classLoader.getResourceAsStream("rules" + "/" + pluginName + RULE_FILE_NAME_PATTERN); + return filestream; + } + + public InputStream getPluginTemplateFileStream(String pluginName) { + if(pluginName.isEmpty()){ + throw new RuntimeException("Transformation plugin not found"); + } + ClassLoader classLoader = TransformersFactory.class.getClassLoader(); + InputStream filestream = classLoader.getResourceAsStream("templates" + "/" + pluginName + TEMPLATE_FILE_NAME_PATTERN); + return filestream; + } + public PipelineTemplateModel getTemplateModel(String pluginName) { String templatePath = getPluginTemplateFileLocation(pluginName); diff --git a/data-prepper-pipeline-parser/src/main/resources/rules/documentdb-rule.yaml b/data-prepper-pipeline-parser/src/main/resources/rules/documentdb-rule.yaml new file mode 100644 index 0000000000..e7279ee733 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/resources/rules/documentdb-rule.yaml @@ -0,0 +1,3 @@ +apply_when: + - "$..source.documentdb" + - "$..source.documentdb.s3_bucket" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml b/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml new file mode 100644 index 0000000000..4d3c024fa3 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/resources/templates/documentdb-template.yaml @@ -0,0 +1,76 @@ +"<>": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: + documentdb: "<<$.<>.source.documentdb>>" + routes: + - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' + - stream_load: 'getMetadata("ingestion_type") == "STREAM"' + sink: + - s3: + routes: + - initial_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.opensearch.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.opensearch.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.opensearch.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "256kb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + event_json: + - s3: + routes: + - stream_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.opensearch.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.opensearch.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.opensearch.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "30s" + maximum_size: "1mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + event_json: + +"<>-s3": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: + s3: + codec: + event_json: + compression: "none" + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.opensearch.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.opensearch.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.opensearch.aws.sts_header_overrides>>" + acknowledgments: true + delete_s3_objects_on_read: true + scan: + buckets: + - bucket: + name: "<<$.<>.source.documentdb.s3_bucket>>" + filter: + include_prefix: ["<>.source.documentdb.s3_prefix>>"] + scheduling: + interval: "60s" + processor: "<<$.<>.processor>>" + sink: "<<$.<>.sink>>" + routes: "<<$.<>.routes>>" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/resources/rules/documentdb-rule.yaml b/data-prepper-pipeline-parser/src/resources/rules/documentdb-rule.yaml deleted file mode 100644 index 28da1406f8..0000000000 --- a/data-prepper-pipeline-parser/src/resources/rules/documentdb-rule.yaml +++ /dev/null @@ -1,3 +0,0 @@ -apply_when: - - "$..source.documentdb" - - "$..source.documentdb.collections[0].s3_bucket" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/resources/templates/documentdb-template.yaml b/data-prepper-pipeline-parser/src/resources/templates/documentdb-template.yaml deleted file mode 100644 index 9cb3c67aa2..0000000000 --- a/data-prepper-pipeline-parser/src/resources/templates/documentdb-template.yaml +++ /dev/null @@ -1,83 +0,0 @@ -"<>": - workers: "<<$.<>.workers>>" - delay: "<<$.<>.delay>>" - buffer: - bounded_blocking: - batch_size: 125000 - buffer_size: 1000000 - source: - documentdb: "<<$.<>.source.documentdb>>" - routes: - - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' - - stream_load: 'getMetadata("ingestion_type") == "STREAM"' - sink: - - s3: - routes: - - initial_load - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.opensearch.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.opensearch.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.opensearch.aws.sts_header_overrides>>" - bucket: "<<$.<>.source.documentdb.s3_bucket>>" - threshold: - event_collect_timeout: "120s" - maximum_size: "2mb" - aggregate_threshold: - maximum_size: "256kb" - flush_capacity_ratio: 0 - object_key: - path_prefix: "${getMetadata(\"s3_partition_key\")}" - codec: - json: - - s3: - routes: - - stream_load - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.opensearch.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.opensearch.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.opensearch.aws.sts_header_overrides>>" - bucket: "<<$.<>.source.documentdb.s3_bucket>>" - threshold: - event_collect_timeout: "30s" - maximum_size: "1mb" - aggregate_threshold: - maximum_size: "128mb" - flush_capacity_ratio: 0 - object_key: - path_prefix: "${getMetadata(\"s3_partition_key\")}" - codec: - json: - - "<>.s3": - workers: "<<$.<>.workers>>" - delay: "<<$.<>.delay>>" - buffer: - bounded_blocking: - batch_size: 125000 - buffer_size: 1000000 - source: - s3: - codec: - json: - compression: "none" - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.opensearch.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.opensearch.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.opensearch.aws.sts_header_overrides>>" - acknowledgments: true - delete_s3_objects_on_read: true - scan: - buckets: - - bucket: - name: "<<$.<>.source.documentdb.s3_bucket>>" - filter: - include_prefix: ["<>.source.documentdb.s3_prefix>>"] - depth: "<>.source.documentdb.s3_prefix>>" - scheduling: - interval: "1s" - processor: "<<$.<>.processor>>" - sink: "<<$.<>.sink>>" - routes: "<<$.<>.routes>>" \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java index 9b5bec5c57..361feb3558 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/rule/RuleEvaluatorTest.java @@ -19,6 +19,9 @@ import org.opensearch.dataprepper.pipeline.parser.TestConfigurationProvider; import org.opensearch.dataprepper.pipeline.parser.transformer.TransformersFactory; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -28,7 +31,7 @@ class RuleEvaluatorTest { @Test - void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() { + void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() throws FileNotFoundException { // Set up String ruleDocDBFilePath = TestConfigurationProvider.RULES_TRANSFORMATION_DOCUMENTDB_CONFIG_FILE; @@ -54,6 +57,10 @@ void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() { )); RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); + + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); + when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + RuleEvaluatorResult result = ruleEvaluator.isTransformationNeeded(pipelinesDataFlowModel); // Assert @@ -104,6 +111,8 @@ void testThrowsExceptionOnFileError() { // Setup mock to throw an exception when file path is incorrect when(transformersFactory.getPluginRuleFileLocation("documentdb")).thenThrow(new RuntimeException("File not found")); + when(transformersFactory.getPluginRuleFileStream("documentdb")).thenThrow(new RuntimeException("File not found")); + RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory); // Execute and Assert diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java index 68dae1b662..cdc2463bd9 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java @@ -23,7 +23,9 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Map; @RunWith(PowerMockRunner.class) @@ -54,6 +56,10 @@ void test_successful_transformation_with_only_source_and_sink() throws IOExcepti TEMPLATES_DIRECTORY_PATH)); when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); + InputStream templateStream = new FileInputStream(templateDocDBFilePath); + when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory @@ -83,6 +89,10 @@ void test_successful_transformation_with_documentdb() throws IOException { TEMPLATES_DIRECTORY_PATH)); when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); + InputStream templateStream = new FileInputStream(templateDocDBFilePath); + when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory @@ -112,6 +122,10 @@ void test_successful_transformation_with_subpipelines() throws IOException { TEMPLATES_DIRECTORY_PATH)); when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); + InputStream templateStream = new FileInputStream(templateDocDBFilePath); + when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory @@ -141,6 +155,10 @@ void test_successful_transformation_with_functionPlaceholder() throws IOExceptio TEMPLATES_DIRECTORY_PATH)); when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); + InputStream templateStream = new FileInputStream(templateDocDBFilePath); + when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory @@ -171,6 +189,10 @@ void test_successful_transformation_with_complete_template() throws IOException TEMPLATES_DIRECTORY_PATH)); when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); + InputStream templateStream = new FileInputStream(templateDocDBFilePath); + when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory @@ -199,6 +221,10 @@ void testInvalidJsonPathThrowsException() throws IOException { TEMPLATES_DIRECTORY_PATH)); when(transformersFactory.getPluginRuleFileLocation(pluginName)).thenReturn(ruleDocDBFilePath); when(transformersFactory.getPluginTemplateFileLocation(pluginName)).thenReturn(templateDocDBFilePath); + InputStream ruleStream = new FileInputStream(ruleDocDBFilePath); + InputStream templateStream = new FileInputStream(templateDocDBFilePath); + when(transformersFactory.getPluginRuleFileStream(pluginName)).thenReturn(ruleStream); + when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream); ruleEvaluator = new RuleEvaluator(transformersFactory); // Load the original and template YAML files from the test resources directory diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml index 3895ee7803..3d4d71f4df 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/expected/documentdb2-expected.yaml @@ -5,6 +5,45 @@ extension: secret_id: "my-docdb-secret" region: "us-east-1" refresh_interval: "PT1H" +documentdb-pipeline-writer: + source: + s3: + codec: + json: null + scan: + buckets: + - bucket: + name: "bucket-name1" + filter: + depth: "5" + scheduling: + interval: "1s" + acknowledgments: true + compression: "none" + aws: + region: "bucket-region" + sts_role_arn: "docdb-role" + sts_external_id: null + sts_header_overrides: null + delete_s3_objects_on_read: true + processor: + - string_converter: + upper_case: true + sink: + - opensearch: + exclude_keys: + - "_id" + document_version_type: "external" + hosts: + - "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" + index: "index_name" + action: "${getMetadata(\"opensearch_action\")}" + document_id: "${getMetadata(\"primary_key\")}" + document_version: "${getMetadata(\"document_version\")}" + aws: + sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role" + region: "us-east-1" + index_type: "custom" documentdb-pipeline-reader: source: documentdb: @@ -26,82 +65,43 @@ documentdb-pipeline-reader: username: "${{aws_secrets:secret:password}}" password: "${{aws_secrets:secret:password}}" s3_prefix: "folder1/folder2" - routes: + route: - initial_load: "getMetadata(\"ingestion_type\") == \"EXPORT\"" - stream_load: "getMetadata(\"ingestion_type\") == \"STREAM\"" sink: - - s3: null - routes: - - "initial_load" - aws: - region: "bucket-region" - sts_role_arn: "docdb-role" - sts_external_id: null - sts_header_overrides: null - bucket: "bucket-name1" - threshold: - event_collect_timeout: "120s" - maximum_size: "2mb" - aggregate_threshold: - maximum_size: "256kb" - flush_capacity_ratio: 0 - object_key: - path_prefix: "${getMetadata(\"s3_partition_key\")}" - codec: - json: null - s3: routes: - - "stream_load" - aws: - region: "bucket-region" - sts_role_arn: "docdb-role" + - "initial_load" bucket: "bucket-name1" - threshold: - event_collect_timeout: "30s" - maximum_size: "1mb" - aggregate_threshold: - maximum_size: "128mb" - flush_capacity_ratio: 0 - object_key: - path_prefix: "${getMetadata(\"s3_partition_key\")}" codec: json: null - documentdb-pipeline-writer: - source: - s3: + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + aggregate_threshold: + maximum_size: "256kb" + flush_capacity_ratio: 0 + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aws: + region: "bucket-region" + sts_role_arn: "docdb-role" + sts_external_id: null + sts_header_overrides: null + - s3: + routes: + - "stream_load" + bucket: "bucket-name1" codec: json: null - compression: "none" - aws: - region: "bucket-region" - sts_role_arn: "docdb-role" - sts_external_id: null - sts_header_overrides: null - acknowledgments: true - delete_s3_objects_on_read: true - scan: - buckets: - - bucket: - name: "bucket-name1" - filter: - depth: "5" - scheduling: - interval: "1s" - processor: - - string_converter: - upper_case: true - sink: - - opensearch: - exclude_keys: - - "_id" - document_version_type: "external" - hosts: - - "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" - action: "${getMetadata(\"opensearch_action\")}" - index: "index_name" - aws: - sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role" - region: "us-east-1" - document_version: "${getMetadata(\"document_version\")}" - document_id: "${getMetadata(\"primary_key\")}" - index_type: "custom" + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + threshold: + event_collect_timeout: "30s" + maximum_size: "1mb" + aws: + region: "bucket-region" + sts_role_arn: "docdb-role" diff --git a/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb2-template.yaml b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb2-template.yaml index 6bc03ea1ad..554329b246 100644 --- a/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb2-template.yaml +++ b/data-prepper-pipeline-parser/src/test/resources/transformation/templates/testSource/documentdb2-template.yaml @@ -6,30 +6,30 @@ - stream_load: 'getMetadata("ingestion_type") == "STREAM"' sink: - s3: - routes: - - initial_load - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" - bucket: "<<$.<>.source.documentdb.s3_bucket>>" - threshold: - event_collect_timeout: "120s" - maximum_size: "2mb" - aggregate_threshold: - maximum_size: "256kb" - flush_capacity_ratio: 0 - object_key: - path_prefix: "${getMetadata(\"s3_partition_key\")}" - codec: - json: + routes: + - initial_load + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.documentdb.s3_bucket>>" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "256kb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + json: - s3: routes: - stream_load - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" bucket: "<<$.<>.source.documentdb.s3_bucket>>" threshold: event_collect_timeout: "30s" @@ -42,17 +42,17 @@ codec: json: - "<>-writer": +"<>-writer": source: s3: codec: json: compression: "none" - aws: - region: "<<$.<>.source.documentdb.s3_region>>" - sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" - sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" - sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" + aws: + region: "<<$.<>.source.documentdb.s3_region>>" + sts_role_arn: "<<$.<>.source.documentdb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.documentdb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.documentdb.aws.sts_header_overrides>>" acknowledgments: true delete_s3_objects_on_read: true scan: