Skip to content

Commit

Permalink
Changing logging level for config transformation and fixing rule (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#4466)

* Changing logging level for config transformation and fixing rule

Signed-off-by: srigovs <[email protected]>

* Adding absolute path for template and rule

Signed-off-by: srigovs <[email protected]>

* Fix template indentation

Signed-off-by: srigovs <[email protected]>

* Fix template indentation

Signed-off-by: srigovs <[email protected]>

* Adding event json to codec in template

Signed-off-by: srigovs <[email protected]>

* Handled PipelinesDataFlow mapping and addressed array type when replacing Node

Signed-off-by: srigovs <[email protected]>

* Add relative path as file stream

Signed-off-by: srigovs <[email protected]>

* Fix environment variable

Signed-off-by: srigovs <[email protected]>

---------

Signed-off-by: srigovs <[email protected]>
  • Loading branch information
srikanthjg authored Apr 26, 2024
1 parent aea97da commit 1007317
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,38 @@
import org.springframework.context.annotation.Configuration;

import javax.inject.Named;
import java.nio.file.Path;
import java.nio.file.Paths;

@Configuration
public class PipelineTransformationConfiguration {
public static final String TEMPLATES_DIRECTORY_PATH = "TEMPLATES_DIRECTORY_PATH";
public static final String RULES_DIRECTORY_PATH = "VALIDATORS_DIRECTORY_PATH";
public static final String RULES_DIRECTORY_PATH = "RULES_DIRECTORY_PATH";
private static final Path currentDir = Paths.get(System.getProperty("user.dir"));
// private static final String parserRelativePath = "/data-prepper-pipeline-parser/src";

@Bean
@Named(RULES_DIRECTORY_PATH)
static String provideRulesDirectoryPath() {
return "resources/rules";
ClassLoader classLoader = PipelineTransformationConfiguration.class.getClassLoader();
String filePath = classLoader.getResource("rules").getFile();
return filePath;
}

@Bean
@Named(TEMPLATES_DIRECTORY_PATH)
static String provideTemplateDirectoryPath() {
return "resources/templates";
ClassLoader classLoader = PipelineTransformationConfiguration.class.getClassLoader();
String filePath = classLoader.getResource("templates").getFile();
return filePath;
}

@Bean
TransformersFactory transformersFactory(
@Named(TEMPLATES_DIRECTORY_PATH) String provideTransformerDirectoryPath,
@Named(RULES_DIRECTORY_PATH) String provideTemplateDirectoryPath
@Named(RULES_DIRECTORY_PATH) String rulesDirectoryPath,
@Named(TEMPLATES_DIRECTORY_PATH) String templatesDirectoryPath
) {
return new TransformersFactory(RULES_DIRECTORY_PATH, TEMPLATES_DIRECTORY_PATH);
return new TransformersFactory(rulesDirectoryPath, templatesDirectoryPath);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -53,19 +53,25 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin
*/
private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) {
PLUGIN_NAME = "documentdb";
String pluginRulesPath = transformersFactory.getPluginRuleFileLocation(PLUGIN_NAME);
// String pluginRulesPath = transformersFactory.getPluginRuleFileLocation(PLUGIN_NAME);
// File ruleFile = new File(pluginRulesPath);
// LOG.info("Checking rule path {}",ruleFile.getAbsolutePath());

Map<String, PipelineModel> pipelines = pipelinesModel.getPipelines();

for (Map.Entry<String, PipelineModel> entry : pipelines.entrySet()) {
try {
String pipelineJson = OBJECT_MAPPER.writeValueAsString(entry);
if (evaluate(pipelineJson, pluginRulesPath)) {
LOG.debug("Rule path {} is evaluated true for pipelineJson {}",pluginRulesPath, pipelineJson);
if (evaluate(pipelineJson, PLUGIN_NAME)) {
LOG.info("Rule for {} is evaluated true for pipelineJson {}", PLUGIN_NAME, pipelineJson);

String templateFilePath = transformersFactory.getPluginTemplateFileLocation(PLUGIN_NAME);
PipelineTemplateModel templateModel = yamlMapper.readValue(new File(templateFilePath),
// String templateFilePathString = transformersFactory.getPluginTemplateFileLocation(PLUGIN_NAME);
// File templateFile = new File(templateFilePathString);
// LOG.info("Absolute path of template file: {}",templateFile.getAbsolutePath());
InputStream templateStream = transformersFactory.getPluginTemplateFileStream(PLUGIN_NAME);
PipelineTemplateModel templateModel = yamlMapper.readValue(templateStream,
PipelineTemplateModel.class);
LOG.debug("Chosen template file {}",templateFilePath);
LOG.info("Template is chosen for {}", PLUGIN_NAME);

return RuleEvaluatorResult.builder()
.withEvaluatedResult(true)
Expand All @@ -89,7 +95,7 @@ private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel)
}

private Boolean evaluate(String pipelinesJson,
String rulePath) {
String pluginName) {

Configuration parseConfig = Configuration.builder()
.jsonProvider(new JacksonJsonProvider())
Expand All @@ -100,16 +106,20 @@ private Boolean evaluate(String pipelinesJson,
ReadContext readPathContext = parseContext.parse(pipelinesJson);

try {
RuleTransformerModel rulesModel = yamlMapper.readValue(new File(rulePath), RuleTransformerModel.class);
//TODO
// ClassLoader classLoader = RuleEvaluator.class.getClassLoader();
// InputStream filestream = classLoader.getResourceAsStream("rules/documentdb-rule.yaml");
InputStream ruleStream = transformersFactory.getPluginRuleFileStream(pluginName);
RuleTransformerModel rulesModel = yamlMapper.readValue(ruleStream, RuleTransformerModel.class);
List<String> rules = rulesModel.getApplyWhen();
for (String rule : rules) {
Object result = readPathContext.read(rule);
}
} catch (IOException e) {
LOG.warn("Error reading file {}", rulePath);
LOG.warn("Error reading {} rule",pluginName);
return false;
} catch (PathNotFoundException e) {
LOG.warn("Path not found {}", rulePath);
LOG.warn("Json Path not found for {}", pluginName);
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme
private static final String JSON_PATH_ARRAY_DISAMBIGUATOR_PATTERN = "[?(@.";
private static final String RECURSIVE_JSON_PATH_PATH = "$..";
private static final String JSON_PATH_IDENTIFIER = "$.";
private static final String ARRAY_NODE_PATTERN = "([^\\[]+)\\[(\\d+)\\]$";
private static final String SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE = "SOURCE_COORDINATION_PIPELINE_IDENTIFIER";


Expand Down Expand Up @@ -102,12 +104,14 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT

if (!ruleEvaluatorResult.isEvaluatedResult() ||
ruleEvaluatorResult.getPipelineName() == null) {
LOG.info("No transformation needed");
return preTransformedPipelinesDataFlowModel;
}

//To differentiate between sub-pipelines that dont need transformation.
String pipelineNameThatNeedsTransformation = ruleEvaluatorResult.getPipelineName();
PipelineTemplateModel templateModel = ruleEvaluatorResult.getPipelineTemplateModel();
LOG.info("Transforming pipeline config for pipeline {}",pipelineNameThatNeedsTransformation);
try {

Map<String, PipelineModel> pipelines = preTransformedPipelinesDataFlowModel.getPipelines();
Expand All @@ -121,6 +125,7 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT
//Replace pipeline name placeholder with pipelineNameThatNeedsTransformation
String templateJsonString = replaceTemplatePipelineName(templateJsonStringWithPipelinePlaceholder,
pipelineNameThatNeedsTransformation);
LOG.info("Template - {}",templateJsonString);

// Find all PLACEHOLDER_PATTERN in template json string
Map<String, List<String>> placeholdersMap = findPlaceholdersWithPathsRecursively(templateJsonString);
Expand Down Expand Up @@ -169,19 +174,18 @@ public PipelinesDataFlowModel transformConfiguration(PipelinesDataFlowModel preT
* @throws JsonProcessingException
*/
private PipelinesDataFlowModel getTransformedPipelinesDataFlowModel(String pipelineNameThatNeedsTransformation, PipelinesDataFlowModel preTransformedPipelinesDataFlowModel, JsonNode templateRootNode) throws JsonProcessingException {

//update template json
String transformedJson = objectMapper.writeValueAsString(templateRootNode);
LOG.debug("{} pipeline has been transformed to :{}", pipelineNameThatNeedsTransformation, transformedJson);
JsonNode transformedJsonNode = templateRootNode.get(TEMPLATE_PIPELINE_ROOT_STRING);
String transformedJson = objectMapper.writeValueAsString(transformedJsonNode);
LOG.info("{} pipeline has been transformed to :{}", pipelineNameThatNeedsTransformation, transformedJson);

//convert TransformedJson to PipelineModel with the data from preTransformedDataFlowModel.
//transform transformedJson to Map
Map<String, Object> transformedConfigMap = objectMapper.readValue(transformedJson, Map.class);
PipelinesDataFlowModel transformedSinglePipelineDataFlowModel = objectMapper.readValue(transformedJson, PipelinesDataFlowModel.class);
Map<String, PipelineModel> transformedPipelines = transformedSinglePipelineDataFlowModel.getPipelines();

Map<String, PipelineModel> pipelines = preTransformedPipelinesDataFlowModel.getPipelines();

// get the root of the Transformed Pipeline Model, to get the actual pipelines.
// direct conversion to PipelineDataModel throws exception.
Map<String, PipelineModel> transformedPipelines = (Map<String, PipelineModel>) transformedConfigMap.get(TEMPLATE_PIPELINE_ROOT_STRING);
pipelines.forEach((pipelineName, pipeline) -> {
if (!pipelineName.equals(pipelineNameThatNeedsTransformation)) {
transformedPipelines.put(pipelineName, pipeline);
Expand All @@ -194,7 +198,7 @@ private PipelinesDataFlowModel getTransformedPipelinesDataFlowModel(String pipel
transformedPipelines
);
String transformedPipelinesDataFlowModelJson = objectMapper.writeValueAsString(transformedPipelinesDataFlowModel);
LOG.debug("Transformed PipelinesDataFlowModel: {}", transformedPipelinesDataFlowModelJson);
LOG.info("Transformed PipelinesDataFlowModel: {}", transformedPipelinesDataFlowModelJson);

return transformedPipelinesDataFlowModel;
}
Expand Down Expand Up @@ -311,6 +315,9 @@ private String executeFunctionPlaceholder(String functionPlaceholderValue, Strin
private Object parseParameter(String parameter, String pipelineJson) {
if(isJsonPath(parameter)){
JsonNode pipelineNode = JsonPath.using(parseConfigWithJsonNode).parse(pipelineJson).read(parameter);
if(pipelineNode==null){
return null;
}
if(!pipelineNode.isValueNode()){
throw new RuntimeException("parameter has to be a value node");
}
Expand Down Expand Up @@ -343,11 +350,17 @@ private boolean isJsonPath(String parameter) {
* @return
*/
public String calculateDepth(String s3Prefix) {
if(s3Prefix == null){
return Integer.toString(3);
}
return Integer.toString(s3Prefix.split("/").length + 3);
}

public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
if(s3Prefix == null){
return envSourceCoordinationIdentifier;
}
return s3Prefix+"/"+envSourceCoordinationIdentifier;
}

Expand Down Expand Up @@ -383,18 +396,32 @@ public Object invokeMethod(String methodName, Class<?> parameterType, Object arg
public void replaceNode(JsonNode root, String jsonPath, JsonNode newNode) {
try {
if (newNode == null) {
// throw new PathNotFoundException(format("jsonPath {} not found", jsonPath));
LOG.debug("Did not find jsonPath {}",jsonPath);
LOG.info("Did not find jsonPath {}",jsonPath);
}
// Read the parent path of the target node
String parentPath = jsonPath.substring(0, jsonPath.lastIndexOf('.'));
String fieldName = jsonPath.substring(jsonPath.lastIndexOf('.') + 1);

//Handle if fieldName is an array
Pattern pattern = Pattern.compile(ARRAY_NODE_PATTERN);
Matcher matcher = pattern.matcher(fieldName);

// Find the parent node
JsonNode parentNode = JsonPath.using(parseConfigWithJsonNode).parse(root).read(parentPath);

// Replace the target field in the parent node
if (parentNode != null && parentNode instanceof ObjectNode) {
if(matcher.find()){
//Handle array
String field = matcher.group(1);
int index = Integer.parseInt(matcher.group(2));
JsonNode arrayNodeResult = JsonPath.using(parseConfigWithJsonNode).parse(root).read(parentPath+"."+field);
if (!(arrayNodeResult instanceof ArrayNode)){
throw new RuntimeException("Json path is of array type, but parsed result is not arrayNode");
}
ArrayNode arrayNode = (ArrayNode) arrayNodeResult;
// Replace the element in the array
arrayNode.set(index, newNode);
} else if (parentNode != null && parentNode instanceof ObjectNode) {
((ObjectNode) parentNode).replace(fieldName, newNode);
} else {
LOG.error("Path does not point to object node");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;

public class TransformersFactory implements PipelineTransformationPathProvider {

Expand All @@ -22,7 +23,6 @@ public class TransformersFactory implements PipelineTransformationPathProvider {
private final String RULE_FILE_NAME_PATTERN = "-rule.yaml";
private final String templatesDirectoryPath;
private final String rulesDirectoryPath;
String PLUGIN_NAME = null;

public TransformersFactory(@Named(RULES_DIRECTORY_PATH) final String rulesDirectoryPath,
@Named(TEMPLATES_DIRECTORY_PATH) final String templatesDirectoryPath) {
Expand Down Expand Up @@ -54,6 +54,24 @@ public String getPluginRuleFileLocation(String pluginName) {
return rulesDirectoryPath + "/" + pluginName + RULE_FILE_NAME_PATTERN;
}

public InputStream getPluginRuleFileStream(String pluginName) {
if(pluginName.isEmpty()){
throw new RuntimeException("Transformation plugin not found");
}
ClassLoader classLoader = TransformersFactory.class.getClassLoader();
InputStream filestream = classLoader.getResourceAsStream("rules" + "/" + pluginName + RULE_FILE_NAME_PATTERN);
return filestream;
}

public InputStream getPluginTemplateFileStream(String pluginName) {
if(pluginName.isEmpty()){
throw new RuntimeException("Transformation plugin not found");
}
ClassLoader classLoader = TransformersFactory.class.getClassLoader();
InputStream filestream = classLoader.getResourceAsStream("templates" + "/" + pluginName + TEMPLATE_FILE_NAME_PATTERN);
return filestream;
}

public PipelineTemplateModel getTemplateModel(String pluginName) {
String templatePath = getPluginTemplateFileLocation(pluginName);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
apply_when:
- "$..source.documentdb"
- "$..source.documentdb.s3_bucket"
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"<<pipeline-name>>":
workers: "<<$.<<pipeline-name>>.workers>>"
delay: "<<$.<<pipeline-name>>.delay>>"
buffer: "<<$.<<pipeline-name>>.buffer>>"
source:
documentdb: "<<$.<<pipeline-name>>.source.documentdb>>"
routes:
- initial_load: 'getMetadata("ingestion_type") == "EXPORT"'
- stream_load: 'getMetadata("ingestion_type") == "STREAM"'
sink:
- s3:
routes:
- initial_load
aws:
region: "<<$.<<pipeline-name>>.source.documentdb.s3_region>>"
sts_role_arn: "<<$.<<pipeline-name>>.source.documentdb.opensearch.aws.sts_role_arn>>"
sts_external_id: "<<$.<<pipeline-name>>.source.documentdb.opensearch.aws.sts_external_id>>"
sts_header_overrides: "<<$.<<pipeline-name>>.source.documentdb.opensearch.aws.sts_header_overrides>>"
bucket: "<<$.<<pipeline-name>>.source.documentdb.s3_bucket>>"
threshold:
event_collect_timeout: "120s"
maximum_size: "2mb"
aggregate_threshold:
maximum_size: "256kb"
flush_capacity_ratio: 0
object_key:
path_prefix: "${getMetadata(\"s3_partition_key\")}"
codec:
event_json:
- s3:
routes:
- stream_load
aws:
region: "<<$.<<pipeline-name>>.source.documentdb.s3_region>>"
sts_role_arn: "<<$.<<pipeline-name>>.source.documentdb.opensearch.aws.sts_role_arn>>"
sts_external_id: "<<$.<<pipeline-name>>.source.documentdb.opensearch.aws.sts_external_id>>"
sts_header_overrides: "<<$.<<pipeline-name>>.source.documentdb.opensearch.aws.sts_header_overrides>>"
bucket: "<<$.<<pipeline-name>>.source.documentdb.s3_bucket>>"
threshold:
event_collect_timeout: "30s"
maximum_size: "1mb"
aggregate_threshold:
maximum_size: "128mb"
flush_capacity_ratio: 0
object_key:
path_prefix: "${getMetadata(\"s3_partition_key\")}"
codec:
event_json:

"<<pipeline-name>>-s3":
workers: "<<$.<<pipeline-name>>.workers>>"
delay: "<<$.<<pipeline-name>>.delay>>"
buffer: "<<$.<<pipeline-name>>.buffer>>"
source:
s3:
codec:
event_json:
compression: "none"
aws:
region: "<<$.<<pipeline-name>>.source.documentdb.s3_region>>"
sts_role_arn: "<<$.<<pipeline-name>>.source.documentdb.opensearch.aws.sts_role_arn>>"
sts_external_id: "<<$.<<pipeline-name>>.source.documentdb.opensearch.aws.sts_external_id>>"
sts_header_overrides: "<<$.<<pipeline-name>>.source.documentdb.opensearch.aws.sts_header_overrides>>"
acknowledgments: true
delete_s3_objects_on_read: true
scan:
buckets:
- bucket:
name: "<<$.<<pipeline-name>>.source.documentdb.s3_bucket>>"
filter:
include_prefix: ["<<FUNCTION_NAME:getSourceCoordinationIdentifierEnvVariable,PARAMETER:$.<<pipeline-name>>.source.documentdb.s3_prefix>>"]
scheduling:
interval: "60s"
processor: "<<$.<<pipeline-name>>.processor>>"
sink: "<<$.<<pipeline-name>>.sink>>"
routes: "<<$.<<pipeline-name>>.routes>>"

This file was deleted.

Loading

0 comments on commit 1007317

Please sign in to comment.