Skip to content

Commit

Permalink
Add support for dynamic rule detection for pipeline config transforma…
Browse files Browse the repository at this point in the history
…tion (#4601)

* Add support for dynamic rule detection for pipeline config transformation

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Address comments

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Move rules and templates to plugin level

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Add dummy plugin for testing dynamic rule detection

Signed-off-by: Srikanth Govindarajan <[email protected]>

* Address comments

Signed-off-by: Srikanth Govindarajan <[email protected]>

---------

Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg authored Aug 17, 2024
1 parent 1ac6df2 commit 38f8079
Show file tree
Hide file tree
Showing 23 changed files with 574 additions and 389 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,12 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.inject.Named;
import java.nio.file.Path;
import java.nio.file.Paths;

@Configuration
public class PipelineTransformationConfiguration {
public static final String TEMPLATES_DIRECTORY_PATH = "TEMPLATES_DIRECTORY_PATH";
public static final String RULES_DIRECTORY_PATH = "RULES_DIRECTORY_PATH";
private static final Path currentDir = Paths.get(System.getProperty("user.dir"));
// private static final String parserRelativePath = "/data-prepper-pipeline-parser/src";

@Bean
@Named(RULES_DIRECTORY_PATH)
static String provideRulesDirectoryPath() {
ClassLoader classLoader = PipelineTransformationConfiguration.class.getClassLoader();
String filePath = classLoader.getResource("rules").getFile();
return filePath;
}

@Bean
@Named(TEMPLATES_DIRECTORY_PATH)
static String provideTemplateDirectoryPath() {
ClassLoader classLoader = PipelineTransformationConfiguration.class.getClassLoader();
String filePath = classLoader.getResource("templates").getFile();
return filePath;
}

@Bean
TransformersFactory transformersFactory(
@Named(RULES_DIRECTORY_PATH) String rulesDirectoryPath,
@Named(TEMPLATES_DIRECTORY_PATH) String templatesDirectoryPath
) {
return new TransformersFactory(rulesDirectoryPath, templatesDirectoryPath);
TransformersFactory transformersFactory() {
return new TransformersFactory();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
package org.opensearch.dataprepper.pipeline.parser.rule;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.PathNotFoundException;
import com.jayway.jsonpath.ReadContext;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
Expand All @@ -25,6 +24,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand All @@ -34,52 +34,37 @@ public class RuleEvaluator {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
private final TransformersFactory transformersFactory;
private String PLUGIN_NAME = null;


public RuleEvaluator(TransformersFactory transformersFactory) {
this.transformersFactory = transformersFactory;
}

public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelineModel) {
//TODO - Dynamically scan the rules folder and get the corresponding template.
return isDocDBSource(pipelineModel);
}

/**
* Evaluates model based on pre defined rules and
* result contains the name of the pipeline that will need transformation,
* evaluated boolean result and the corresponding template model
* Assumption: only one pipeline can have transformation.
*
* @param pipelinesModel
* @return RuleEvaluatorResult
*/
private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) {
PLUGIN_NAME = "documentdb";

Map<String, PipelineModel> pipelines = pipelinesModel.getPipelines();
Map<String, PipelineModel> pipelines = pipelineModel.getPipelines();
for (Map.Entry<String, PipelineModel> entry : pipelines.entrySet()) {
try {
String pipelineJson = OBJECT_MAPPER.writeValueAsString(entry);
if (evaluate(pipelineJson, PLUGIN_NAME)) {
LOG.info("Rule for {} is evaluated true for pipelineJson {}", PLUGIN_NAME, pipelineJson);
RuleFileEvaluation ruleFileEvaluation = evaluate(pipelineJson);

if (ruleFileEvaluation.getResult()) {
String pluginName = ruleFileEvaluation.getPluginName();
LOG.info("Applying rule {}",ruleFileEvaluation.getRuleFileName().toString());
LOG.info("Rule for {} is evaluated true for pipelineJson {}", pluginName, pipelineJson);

InputStream templateStream = transformersFactory.getPluginTemplateFileStream(PLUGIN_NAME);
InputStream templateStream = transformersFactory.getPluginTemplateFileStream(pluginName);
PipelineTemplateModel templateModel = yamlMapper.readValue(templateStream,
PipelineTemplateModel.class);
LOG.info("Template is chosen for {}", PLUGIN_NAME);
LOG.info("Template is chosen for {}", pluginName);

return RuleEvaluatorResult.builder()
.withEvaluatedResult(true)
.withPipelineTemplateModel(templateModel)
.withPipelineName(entry.getKey())
.build();
}
} catch (FileNotFoundException e){
LOG.error("Template File Not Found for {}", PLUGIN_NAME);
throw new RuntimeException(e);
}
catch (JsonProcessingException e) {
} catch (JsonProcessingException e) {
LOG.error("Error processing json");
throw new RuntimeException(e);
} catch (IOException e) {
Expand All @@ -94,47 +79,68 @@ private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel)
.build();
}

private Boolean evaluate(String pipelinesJson,
String pluginName) {

private RuleFileEvaluation evaluate(String pipelinesJson) {
Configuration parseConfig = Configuration.builder()
.jsonProvider(new JacksonJsonProvider())
.jsonProvider(new JacksonJsonNodeJsonProvider())
.mappingProvider(new JacksonMappingProvider())
.options(Option.AS_PATH_LIST)
.options(Option.SUPPRESS_EXCEPTIONS)
.build();
ParseContext parseContext = JsonPath.using(parseConfig);
ReadContext readPathContext = parseContext.parse(pipelinesJson);

RuleTransformerModel rulesModel = null;
InputStream ruleStream = null;

try {
ruleStream = transformersFactory.getPluginRuleFileStream(pluginName);
Collection<RuleStream> ruleStreams = transformersFactory.loadRules();

rulesModel = yamlMapper.readValue(ruleStream, RuleTransformerModel.class);
List<String> rules = rulesModel.getApplyWhen();
for (String rule : rules) {
try {
Object result = readPathContext.read(rule);
} catch (PathNotFoundException e) {
LOG.warn("Json Path not found for {}", pluginName);
return false;
}
}
} catch (FileNotFoundException e){
LOG.warn("Rule File Not Found for {}", pluginName);
return false;
} catch(IOException e){
throw new RuntimeException(e);
}finally {
if (ruleStream != null) {
//walk through all rules and return first valid
for (RuleStream ruleStream : ruleStreams) {
try {
rulesModel = yamlMapper.readValue(ruleStream.getRuleStream(), RuleTransformerModel.class);
List<String> rules = rulesModel.getApplyWhen();
String pluginName = rulesModel.getPluginName();
boolean allRulesValid = true;

for (String rule : rules) {
try {
JsonNode result = JsonPath.using(parseConfig).parse(pipelinesJson).read(rule);
if (result == null || result.size() == 0) {
allRulesValid = false;
break;
}
} catch (PathNotFoundException e) {
LOG.debug("Json Path not found for {}", ruleStream.getName());
allRulesValid = false;
break;
}
}

if (allRulesValid) {
return RuleFileEvaluation.builder()
.withPluginName(pluginName)
.withRuleFileName(ruleStream.getName())
.withResult(true)
.build();
}
} finally {
ruleStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

} catch (FileNotFoundException e) {
LOG.debug("Rule File Not Found", e);
return RuleFileEvaluation.builder()
.withPluginName(null)
.withRuleFileName(null)
.withResult(false)
.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
return true;

return RuleFileEvaluation.builder()
.withPluginName(null)
.withRuleFileName(null)
.withResult(false)
.build();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.opensearch.dataprepper.pipeline.parser.rule;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

@Builder(setterPrefix = "with")
@AllArgsConstructor
@Data
public class RuleFileEvaluation {
private Boolean result;
private String ruleFileName;
private String pluginName;

public RuleFileEvaluation() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.opensearch.dataprepper.pipeline.parser.rule;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.IOException;
import java.io.InputStream;

@Data
@AllArgsConstructor
public class RuleStream {
private String name;
private InputStream ruleStream;


public void close() {
if (ruleStream != null) {
try {
ruleStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,31 @@
*/
package org.opensearch.dataprepper.pipeline.parser.rule;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.List;

@Data
@AllArgsConstructor
public class RuleTransformerModel {

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("apply_when")
private List<String> applyWhen;

public RuleTransformerModel() {
}

public RuleTransformerModel(List<String> applyWhen) {
this.applyWhen = applyWhen;
}
@JsonProperty("plugin_name")
private String pluginName;

public List<String> getApplyWhen() {
return applyWhen;
}

public void setApplyWhen(List<String> applyWhen) {
this.applyWhen = applyWhen;
public RuleTransformerModel() {
}

@Override
public String toString() {
return "RuleConfiguration{" +
"applyWhen=" + applyWhen +
'}';
"\npluginName="+ pluginName +'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opensearch.dataprepper.pipeline.parser.rule;

import java.io.IOException;
import java.io.InputStream;

public class TemplateStream {
private String name;
private InputStream templateStream;


public void close() {
if (templateStream != null) {
try {
templateStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
Loading

0 comments on commit 38f8079

Please sign in to comment.