Skip to content

Commit

Permalink
Pipeline Configuration Transformation (opensearch-project#4446)
Browse files Browse the repository at this point in the history
* Adding templates

Signed-off-by: srigovs <[email protected]>

* Added Dynamic yaml transformer

Signed-off-by: srigovs <[email protected]>

* Added Rule evaluator

Signed-off-by: srigovs <[email protected]>

* Added rule evaluator

Signed-off-by: srigovs <[email protected]>

* Added json walk

Signed-off-by: srigovs <[email protected]>

* Add transformation logic

Signed-off-by: srigovs <[email protected]>

* Add dynamic rule

Signed-off-by: srigovs <[email protected]>

* Almost working

Signed-off-by: srigovs <[email protected]>

* Adding multiple pipelines part1

Signed-off-by: srigovs <[email protected]>

* Adding multiple pipelines part2-incomplete

Signed-off-by: srigovs <[email protected]>

* Works e2e for 1 pipeline

Signed-off-by: srigovs <[email protected]>

* Added multi pipeline template and pipelinemodel support, works for docDB with big template

Signed-off-by: srigovs <[email protected]>

* added tests for models and fixed beans, one more fix needed for bean

Signed-off-by: srigovs <[email protected]>

* Fixed IT and beans

Signed-off-by: srigovs <[email protected]>

* Update bean to have only pipelineDataModel and not parser

Signed-off-by: srigovs <[email protected]>

* Add banner

Signed-off-by: srigovs <[email protected]>

* Code cleanup and add comments

Signed-off-by: srigovs <[email protected]>

* Support user pipeline configuration dynamic transformation based on
templates and rules

Signed-off-by: srigovs <[email protected]>

* Address comments

Signed-off-by: srigovs <[email protected]>

* Added Function Call support in templates

Signed-off-by: srigovs <[email protected]>

* Added Function Call support in templates

Signed-off-by: srigovs <[email protected]>

* Modify documentDB template.

Signed-off-by: srigovs <[email protected]>

* Code clean up

Signed-off-by: srigovs <[email protected]>

* Code clean up

Signed-off-by: srigovs <[email protected]>

---------

Signed-off-by: srigovs <[email protected]>
  • Loading branch information
srikanthjg authored Apr 25, 2024
1 parent 8bd7fc9 commit e590fde
Show file tree
Hide file tree
Showing 47 changed files with 2,224 additions and 23 deletions.
2 changes: 1 addition & 1 deletion data-prepper-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies {
jacocoTestCoverageVerification {
dependsOn(jacocoTestReport)
violationRules {
rule { //in addition to core projects rule - this one checks for 100% code coverage for this project
rule {
limit {
minimum = 1.0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
@JsonDeserialize(using = SinkModel.SinkModelDeserializer.class)
public class SinkModel extends PluginModel {

SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, pluginSettings));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public Map<String, Pipeline> transformConfiguration() {
Map.Entry::getKey,
entry -> new PipelineConfiguration(entry.getValue())
));

final List<String> allPipelineNames = PipelineConfigurationValidator.validateAndGetPipelineNames(pipelineConfigurationMap);

// LinkedHashMap to preserve insertion order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,32 @@
package org.opensearch.dataprepper.parser.config;

import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineTransformer;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderProvider;
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationFileReader;
import org.opensearch.dataprepper.pipeline.parser.PipelineConfigurationReader;
import org.opensearch.dataprepper.pipeline.parser.PipelinesDataflowModelParser;
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator;
import org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer;
import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineConfigurationTransformer;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.springframework.context.annotation.Primary;

@Configuration
@ComponentScan(basePackages = {
"org.opensearch.dataprepper.pipeline.parser",
"org.opensearch.dataprepper.plugin"
})
public class PipelineParserConfiguration {

@Bean
Expand All @@ -35,7 +45,7 @@ public PipelineTransformer pipelineParser(
final EventFactory eventFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory
) {
) {
return new PipelineTransformer(pipelinesDataFlowModel,
pluginFactory,
peerForwarderProvider,
Expand All @@ -60,7 +70,21 @@ public PipelinesDataflowModelParser pipelinesDataflowModelParser(
}

@Bean
public PipelineConfigurationTransformer pipelineConfigTransformer(RuleEvaluator ruleEvaluator) {
return new DynamicConfigTransformer(ruleEvaluator);
}


@Bean(name = "pipelinesDataFlowModel")
@Primary
public PipelinesDataFlowModel pipelinesDataFlowModel(
PipelineConfigurationTransformer pipelineConfigTransformer,
@Qualifier("preTransformedDataFlowModel") PipelinesDataFlowModel preTransformedDataFlowModel) {
return pipelineConfigTransformer.transformConfiguration(preTransformedDataFlowModel);
}

@Bean(name = "preTransformedDataFlowModel")
public PipelinesDataFlowModel preTransformedDataFlowModel(
final PipelinesDataflowModelParser pipelinesDataflowModelParser) {
return pipelinesDataflowModelParser.parseConfiguration();
}
Expand Down
23 changes: 22 additions & 1 deletion data-prepper-pipeline-parser/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,28 @@ group = 'org.opensearch.dataprepper.core'
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'org.apache.commons:commons-collections4:4.4'
implementation 'org.projectlombok:lombok:1.18.22'
implementation 'com.jayway.jsonpath:json-path:2.6.0'
implementation 'javax.inject:javax.inject:1'
implementation(libs.spring.core) {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
testImplementation testLibs.bundles.junit
testImplementation testLibs.bundles.mockito
testImplementation testLibs.hamcrest
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
testImplementation 'org.powermock:powermock-api-mockito2:2.0.9'
testImplementation 'org.assertj:assertj-core:3.20.2'
testImplementation 'junit:junit:4.13.2'
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
testImplementation 'org.powermock:powermock-api-mockito2:2.0.9'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.dataprepper.pipeline.parser;

import static java.lang.String.format;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -13,8 +14,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.lang.String.format;

public class PipelineConfigurationFileReader implements PipelineConfigurationReader {
private static final Logger LOG = LoggerFactory.getLogger(PipelineConfigurationFileReader.class);
private final String pipelineConfigurationFileLocation;
Expand All @@ -41,6 +40,7 @@ private List<InputStream> getInputStreamsForConfigurationFiles() {
return inputStreams;
} else if (configurationLocation.isDirectory()) {
FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml"));

List<InputStream> inputStreams = Stream.of(configurationLocation.listFiles(yamlFilter))
.map(this::getInputStreamForFile)
.filter(Objects::nonNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
public interface PipelineConfigurationReader {

/**
*
* @return a List of InputStream that contains each of the pipeline configurations.
* the caller of this method is responsible for closing these input streams after they are used
* the caller of this method is responsible for closing these input streams after they are used
*/
List<InputStream> getPipelineConfigurationInputStreams();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.pipeline.parser;

import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator;
import org.opensearch.dataprepper.pipeline.parser.transformer.TransformersFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.inject.Named;

@Configuration
public class PipelineTransformationConfiguration {
public static final String TEMPLATES_DIRECTORY_PATH = "TEMPLATES_DIRECTORY_PATH";
public static final String RULES_DIRECTORY_PATH = "VALIDATORS_DIRECTORY_PATH";

@Bean
@Named(RULES_DIRECTORY_PATH)
static String provideRulesDirectoryPath() {
return "resources/rules";
}

@Bean
@Named(TEMPLATES_DIRECTORY_PATH)
static String provideTemplateDirectoryPath() {
return "resources/templates";
}

@Bean
TransformersFactory transformersFactory(
@Named(TEMPLATES_DIRECTORY_PATH) String provideTransformerDirectoryPath,
@Named(RULES_DIRECTORY_PATH) String provideTemplateDirectoryPath
) {
return new TransformersFactory(RULES_DIRECTORY_PATH, TEMPLATES_DIRECTORY_PATH);
}

@Bean
public RuleEvaluator ruleEvaluator(TransformersFactory transformersFactory) {
return new RuleEvaluator(transformersFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import static java.lang.String.format;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
Expand All @@ -22,8 +23,6 @@
import java.util.Objects;
import java.util.stream.Collectors;

import static java.lang.String.format;

public class PipelinesDataflowModelParser {
private static final Logger LOG = LoggerFactory.getLogger(PipelinesDataflowModelParser.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory())
Expand All @@ -35,9 +34,11 @@ public PipelinesDataflowModelParser(final PipelineConfigurationReader pipelineCo
this.pipelineConfigurationReader = pipelineConfigurationReader;
}


public PipelinesDataFlowModel parseConfiguration() {
final List<PipelinesDataFlowModel> pipelinesDataFlowModels = parseStreamsToPipelinesDataFlowModel();
return mergePipelinesDataModels(pipelinesDataFlowModels);
PipelinesDataFlowModel pipelinesDataFlowModel = mergePipelinesDataModels(pipelinesDataFlowModels);
return pipelinesDataFlowModel;
}

private void validateDataPrepperVersion(final DataPrepperVersion version) {
Expand Down Expand Up @@ -89,4 +90,5 @@ private PipelinesDataFlowModel mergePipelinesDataModels(
return pipelineExtensionsList.isEmpty() ? new PipelinesDataFlowModel(pipelinesDataFlowModelMap) :
new PipelinesDataFlowModel(pipelineExtensionsList.get(0), pipelinesDataFlowModelMap);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.pipeline.parser.rule;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.PathNotFoundException;
import com.jayway.jsonpath.ReadContext;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel;
import org.opensearch.dataprepper.pipeline.parser.transformer.TransformersFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;

public class RuleEvaluator {

private static final Logger LOG = LoggerFactory.getLogger(RuleEvaluator.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
private final TransformersFactory transformersFactory;
private String PLUGIN_NAME = null;

public RuleEvaluator(TransformersFactory transformersFactory) {
this.transformersFactory = transformersFactory;
}

public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelineModel) {
return isDocDBSource(pipelineModel);
}

/**
* Evaluates model based on pre defined rules and
* result contains the name of the pipeline that will need transformation,
* evaluated boolean result and the corresponding template model
* Assumption: only one pipeline can have transformation.
* @param pipelinesModel
* @return RuleEvaluatorResult
*/
private RuleEvaluatorResult isDocDBSource(PipelinesDataFlowModel pipelinesModel) {
PLUGIN_NAME = "documentdb";
String pluginRulesPath = transformersFactory.getPluginRuleFileLocation(PLUGIN_NAME);
Map<String, PipelineModel> pipelines = pipelinesModel.getPipelines();

for (Map.Entry<String, PipelineModel> entry : pipelines.entrySet()) {
try {
String pipelineJson = OBJECT_MAPPER.writeValueAsString(entry);
if (evaluate(pipelineJson, pluginRulesPath)) {
LOG.debug("Rule path {} is evaluated true for pipelineJson {}",pluginRulesPath, pipelineJson);

String templateFilePath = transformersFactory.getPluginTemplateFileLocation(PLUGIN_NAME);
PipelineTemplateModel templateModel = yamlMapper.readValue(new File(templateFilePath),
PipelineTemplateModel.class);
LOG.debug("Chosen template file {}",templateFilePath);

return RuleEvaluatorResult.builder()
.withEvaluatedResult(true)
.withPipelineTemplateModel(templateModel)
.withPipelineName(entry.getKey())
.build();
}
} catch (JsonProcessingException e) {
LOG.error("Error processing json");
throw new RuntimeException(e);
} catch (IOException e) {
LOG.error("Error reading file");
throw new RuntimeException(e);
}
}
return RuleEvaluatorResult.builder()
.withEvaluatedResult(false)
.withPipelineName(null)
.withPipelineTemplateModel(null)
.build();
}

private Boolean evaluate(String pipelinesJson,
String rulePath) {

Configuration parseConfig = Configuration.builder()
.jsonProvider(new JacksonJsonProvider())
.mappingProvider(new JacksonMappingProvider())
.options(Option.AS_PATH_LIST)
.build();
ParseContext parseContext = JsonPath.using(parseConfig);
ReadContext readPathContext = parseContext.parse(pipelinesJson);

try {
RuleTransformerModel rulesModel = yamlMapper.readValue(new File(rulePath), RuleTransformerModel.class);
List<String> rules = rulesModel.getApplyWhen();
for (String rule : rules) {
Object result = readPathContext.read(rule);
}
} catch (IOException e) {
LOG.warn("Error reading file {}", rulePath);
return false;
} catch (PathNotFoundException e) {
LOG.warn("Path not found {}", rulePath);
return false;
}
return true;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.pipeline.parser.rule;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineTemplateModel;

@Builder(setterPrefix = "with")
@Getter
@AllArgsConstructor
public class RuleEvaluatorResult {

private boolean evaluatedResult;

private String pipelineName;

private PipelineTemplateModel pipelineTemplateModel;

public RuleEvaluatorResult() {

}
}
Loading

0 comments on commit e590fde

Please sign in to comment.