Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Srikanth Govindarajan <[email protected]>
  • Loading branch information
srikanthjg committed Aug 7, 2024
1 parent c80bd2d commit 4a7d5d4
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand All @@ -47,9 +47,9 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin
String pipelineJson = OBJECT_MAPPER.writeValueAsString(entry);
RuleFileEvaluation ruleFileEvaluation = evaluate(pipelineJson);

if (ruleFileEvaluation.result) {
String pluginName = ruleFileEvaluation.pluginName;
LOG.info("Applying rule {}",ruleFileEvaluation.ruleFileName.toString());
if (ruleFileEvaluation.getResult()) {
String pluginName = ruleFileEvaluation.getPluginName();
LOG.info("Applying rule {}",ruleFileEvaluation.getRuleFileName().toString());
LOG.info("Rule for {} is evaluated true for pipelineJson {}", pluginName, pipelineJson);

InputStream templateStream = transformersFactory.getPluginTemplateFileStream(pluginName);
Expand Down Expand Up @@ -82,49 +82,47 @@ public RuleEvaluatorResult isTransformationNeeded(PipelinesDataFlowModel pipelin
}

private RuleFileEvaluation evaluate(String pipelinesJson) {

Configuration parseConfig = Configuration.builder()
.jsonProvider(new JacksonJsonNodeJsonProvider())
.mappingProvider(new JacksonMappingProvider())
.options(Option.SUPPRESS_EXCEPTIONS)
.build();

RuleTransformerModel rulesModel = null;
InputStream ruleStream = null;
try {
List<Path> ruleFiles = transformersFactory.getRuleFiles();

for (Path ruleFile : ruleFiles) {
try {
Collection<RuleInputStream> ruleStreams = transformersFactory.loadRules();

ruleStream = transformersFactory.readRuleFile(ruleFile);
if(ruleStream == null){
continue;
}
rulesModel = yamlMapper.readValue(ruleStream, RuleTransformerModel.class);
List<String> rules = rulesModel.getApplyWhen();
String pluginName = rulesModel.getPluginName();
boolean allRulesValid = true;

for (String rule : rules) {
try {
JsonNode result = JsonPath.using(parseConfig).parse(pipelinesJson).read(rule);
if (result == null || result.size()==0) {
for (RuleInputStream ruleStream : ruleStreams) {
try {
rulesModel = yamlMapper.readValue(ruleStream.getInputStream(), RuleTransformerModel.class);
List<String> rules = rulesModel.getApplyWhen();
String pluginName = rulesModel.getPluginName();
boolean allRulesValid = true;

for (String rule : rules) {
try {
JsonNode result = JsonPath.using(parseConfig).parse(pipelinesJson).read(rule);
if (result == null || result.size() == 0) {
allRulesValid = false;
break;
}
} catch (PathNotFoundException e) {
LOG.debug("Json Path not found for {}", ruleStream.getName());
allRulesValid = false;
break;
}
} catch (PathNotFoundException e) {
LOG.debug("Json Path not found for {}", ruleFile.getFileName().toString());
allRulesValid = false;
break;
}
}

if (allRulesValid) {
return RuleFileEvaluation.builder()
.withPluginName(pluginName)
.withRuleFileName(ruleFile.getFileName().toString())
.withResult(true)
.build();
if (allRulesValid) {
return RuleFileEvaluation.builder()
.withPluginName(pluginName)
.withRuleFileName(ruleStream.getName())
.withResult(true)
.build();
}
} finally {
ruleStream.close();
}
}

Expand All @@ -137,19 +135,13 @@ private RuleFileEvaluation evaluate(String pipelinesJson) {
.build();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (ruleStream != null) {
try {
ruleStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

return RuleFileEvaluation.builder()
.withPluginName(null)
.withRuleFileName(null)
.withResult(false)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
@AllArgsConstructor
@Data
public class RuleFileEvaluation {
Boolean result;
String ruleFileName;
String pluginName;
private Boolean result;
private String ruleFileName;
private String pluginName;

public RuleFileEvaluation() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.opensearch.dataprepper.pipeline.parser.rule;

import java.io.IOException;
import java.io.InputStream;

public class RuleInputStream {
private String name;
private InputStream inputStream;

public RuleInputStream(String name, InputStream inputStream) {
this.name = name;
this.inputStream = inputStream;
}

public String getName() {
return name;
}

public InputStream getInputStream() {
return inputStream;
}

public void close() {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import static org.opensearch.dataprepper.pipeline.parser.PipelineTransformationConfiguration.RULES_DIRECTORY_PATH;
import static org.opensearch.dataprepper.pipeline.parser.PipelineTransformationConfiguration.TEMPLATES_DIRECTORY_PATH;
import org.opensearch.dataprepper.pipeline.parser.rule.RuleInputStream;

import javax.inject.Named;
import java.io.File;
Expand All @@ -22,6 +23,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -68,6 +70,51 @@ public InputStream getPluginTemplateFileStream(String pluginName) {
return filestream;
}

public Collection<RuleInputStream> loadRules() {
URI uri;
try {
uri = getClass().getClassLoader().getResource("rules").toURI();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}

List<RuleInputStream> ruleInputStreams = new ArrayList<>();

if ("jar".equals(uri.getScheme())) {
try (FileSystem fileSystem = FileSystems.newFileSystem(uri, Collections.emptyMap())) {
Path rulesFolderPath = fileSystem.getPath("rules");
try (Stream<Path> paths = Files.walk(rulesFolderPath)) {
paths.filter(Files::isRegularFile)
.forEach(path -> {
InputStream ruleStream = getClass().getClassLoader().getResourceAsStream("rules" + "/" + path.getFileName().toString());
if (ruleStream != null) {
ruleInputStreams.add(new RuleInputStream(path.getFileName().toString(), ruleStream));
}
});
}
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
Path rulesFolderPath = Paths.get(uri);
try (Stream<Path> paths = Files.walk(rulesFolderPath)) {
paths.filter(Files::isRegularFile)
.forEach(path -> {
try {
InputStream ruleStream = Files.newInputStream(path);
ruleInputStreams.add(new RuleInputStream(path.getFileName().toString(), ruleStream));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return ruleInputStreams;
}


public List<Path> getRuleFiles() {
// Get the URI of the rules folder
URI uri = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
Expand All @@ -23,7 +22,6 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -41,10 +39,10 @@ void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() throws IOExc
String pluginName = "documentdb";
String pipelineName = "test-pipeline";
Map<String, Object> sourceOptions = new HashMap<>();
Map<String, Object> s3_bucket = new HashMap<>();
s3_bucket.put("s3_bucket", "bucket-name");
Map<String, Object> s3Bucket = new HashMap<>();
s3Bucket.put("s3_bucket", "bucket-name");
List<Map<String, Object>> collections = new ArrayList<>();
collections.add(s3_bucket);
collections.add(s3Bucket);
sourceOptions.put("collections", collections);
final PluginModel source = new PluginModel(pluginName, sourceOptions);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", null));
Expand All @@ -56,13 +54,12 @@ void test_isTransformationNeeded_ForDocDBSource_ShouldReturn_True() throws IOExc

TransformersFactory transformersFactory = mock(TransformersFactory.class);

Path ruleFile = mock(Path.class);
List<Path> ruleFiles = Collections.singletonList(ruleFile);
InputStream ruleStream = new FileInputStream(ruleDocDBFilePath);
InputStream templateStream = new FileInputStream(ruleDocDBTemplatePath);
when(ruleFile.getFileName()).thenReturn(Paths.get("documentdb-rule.yaml").getFileName());
when(transformersFactory.getRuleFiles()).thenReturn(ruleFiles);
when(transformersFactory.readRuleFile(eq(ruleFile))).thenReturn(ruleStream);
RuleInputStream ruleInputStream = new RuleInputStream(Paths.get(ruleDocDBFilePath).getFileName().toString(), ruleStream);

List<RuleInputStream> ruleStreams = Collections.singletonList(ruleInputStream);
when(transformersFactory.loadRules()).thenReturn(ruleStreams);
when(transformersFactory.getPluginTemplateFileStream(pluginName)).thenReturn(templateStream);

RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory);
Expand All @@ -79,10 +76,10 @@ void test_isTransformationNeeded_ForOtherSource_ShouldReturn_False() throws IOEx
String pluginName = "http";
String pipelineName = "test-pipeline";
Map<String, Object> sourceOptions = new HashMap<>();
Map<String, Object> s3_bucket = new HashMap<>();
s3_bucket.put("s3_bucket", "bucket-name");
Map<String, Object> s3Bucket = new HashMap<>();
s3Bucket.put("s3_bucket", "bucket-name");
List<Map<String, Object>> collections = new ArrayList<>();
collections.add(s3_bucket);
collections.add(s3Bucket);
sourceOptions.put("collections", collections);
final PluginModel source = new PluginModel(pluginName, sourceOptions);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", null));
Expand Down Expand Up @@ -118,7 +115,7 @@ void testThrowsExceptionOnFileError() {
final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(
(PipelineExtensions) null, Collections.singletonMap(pipelineName, pipelineModel));

when(transformersFactory.getRuleFiles()).thenThrow(new RuntimeException("File not found"));
when(transformersFactory.loadRules()).thenThrow(new RuntimeException("File not found"));

RuleEvaluator ruleEvaluator = new RuleEvaluator(transformersFactory);

Expand Down
Loading

0 comments on commit 4a7d5d4

Please sign in to comment.