Skip to content

Commit

Permalink
MAINT: merge dataflow model instead of files (#3290)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Aug 31, 2023
1 parent ececbe0 commit f15e582
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -14,9 +15,8 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -35,18 +35,8 @@ public PipelinesDataflowModelParser(final String pipelineConfigurationFileLocati
}

public PipelinesDataFlowModel parseConfiguration() {
try (final InputStream mergedPipelineConfigurationFiles = mergePipelineConfigurationFiles()) {
final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(mergedPipelineConfigurationFiles,
PipelinesDataFlowModel.class);

final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion();
validateDataPrepperVersion(version);

return pipelinesDataFlowModel;
} catch (IOException e) {
LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation);
throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e);
}
final List<PipelinesDataFlowModel> pipelinesDataFlowModels = parsePipelineConfigurationFiles();
return mergePipelinesDataModels(pipelinesDataFlowModels);
}

private void validateDataPrepperVersion(final DataPrepperVersion version) {
Expand All @@ -57,38 +47,61 @@ private void validateDataPrepperVersion(final DataPrepperVersion version) {
}
}

private InputStream mergePipelineConfigurationFiles() throws IOException {
private List<PipelinesDataFlowModel> parsePipelineConfigurationFiles() {
final File configurationLocation = new File(pipelineConfigurationFileLocation);

if (configurationLocation.isFile()) {
return new FileInputStream(configurationLocation);
return Stream.of(configurationLocation).map(this::parsePipelineConfigurationFile)
.filter(Objects::nonNull).collect(Collectors.toList());
} else if (configurationLocation.isDirectory()) {
FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml"));
List<InputStream> configurationFiles = Stream.of(configurationLocation.listFiles(yamlFilter))
.map(file -> {
InputStream inputStream;
try {
inputStream = new FileInputStream(file);
LOG.info("Reading pipeline configuration from {}", file.getName());
} catch (FileNotFoundException e) {
inputStream = null;
LOG.warn("Pipeline configuration file {} not found", file.getName());
}
return inputStream;
})
List<PipelinesDataFlowModel> pipelinesDataFlowModels = Stream.of(configurationLocation.listFiles(yamlFilter))
.map(this::parsePipelineConfigurationFile)
.filter(Objects::nonNull)
.collect(Collectors.toList());

if (configurationFiles.isEmpty()) {
if (pipelinesDataFlowModels.isEmpty()) {
LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation);
throw new ParseException(
format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation));
}

return new SequenceInputStream(Collections.enumeration(configurationFiles));
return pipelinesDataFlowModels;
} else {
LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation);
throw new ParseException(format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation));
}
}

private PipelinesDataFlowModel parsePipelineConfigurationFile(final File pipelineConfigurationFile) {
try (final InputStream pipelineConfigurationInputStream = new FileInputStream(pipelineConfigurationFile)) {
LOG.info("Reading pipeline configuration from {}", pipelineConfigurationFile.getName());
final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(pipelineConfigurationInputStream,
PipelinesDataFlowModel.class);

final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion();
validateDataPrepperVersion(version);

return pipelinesDataFlowModel;
} catch (IOException e) {
if (e instanceof FileNotFoundException) {
LOG.warn("Pipeline configuration file {} not found", pipelineConfigurationFile.getName());
return null;
}
LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation);
throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e);
}
}

private PipelinesDataFlowModel mergePipelinesDataModels(
final List<PipelinesDataFlowModel> pipelinesDataFlowModels) {
final Map<String, PipelineModel> pipelinesDataFlowModelMap = pipelinesDataFlowModels.stream()
.map(PipelinesDataFlowModel::getPipelines)
.flatMap(pipelines -> pipelines.entrySet().stream())
.collect(Collectors.toUnmodifiableMap(
Map.Entry::getKey,
Map.Entry::getValue
));
return new PipelinesDataFlowModel(pipelinesDataFlowModelMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -46,6 +47,7 @@ void parseConfiguration_from_directory_with_multiple_files_creates_the_correct_m
final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration();
assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(),
equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES));
assertThat(actualPipelinesDataFlowModel.getDataPrepperVersion(), nullValue());
}

@Test
Expand Down

0 comments on commit f15e582

Please sign in to comment.