From f15e5824e44704baceeb561e9f75975cc1cb7cff Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Thu, 31 Aug 2023 15:38:14 -0500 Subject: [PATCH] MAINT: merge dataflow model instead of files (#3290) --------- Signed-off-by: George Chen --- .../parser/PipelinesDataflowModelParser.java | 73 +++++++++++-------- .../PipelinesDataflowModelParserTest.java | 2 + 2 files changed, 45 insertions(+), 30 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java index 0cfd8fa257..c451d46c49 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,9 +15,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.io.SequenceInputStream; -import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -35,18 +35,8 @@ public PipelinesDataflowModelParser(final String pipelineConfigurationFileLocati } public PipelinesDataFlowModel parseConfiguration() { - try (final InputStream mergedPipelineConfigurationFiles = mergePipelineConfigurationFiles()) { - final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(mergedPipelineConfigurationFiles, - PipelinesDataFlowModel.class); - - final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion(); - validateDataPrepperVersion(version); - - return pipelinesDataFlowModel; - } catch (IOException e) { - LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation); - throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e); - } + final List pipelinesDataFlowModels = parsePipelineConfigurationFiles(); + return mergePipelinesDataModels(pipelinesDataFlowModels); } private void validateDataPrepperVersion(final DataPrepperVersion version) { @@ -57,38 +47,61 @@ private void validateDataPrepperVersion(final DataPrepperVersion version) { } } - private InputStream mergePipelineConfigurationFiles() throws IOException { + private List parsePipelineConfigurationFiles() { final File configurationLocation = new File(pipelineConfigurationFileLocation); if (configurationLocation.isFile()) { - return new FileInputStream(configurationLocation); + return Stream.of(configurationLocation).map(this::parsePipelineConfigurationFile) + .filter(Objects::nonNull).collect(Collectors.toList()); } else if (configurationLocation.isDirectory()) { FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml")); - List configurationFiles = Stream.of(configurationLocation.listFiles(yamlFilter)) - .map(file -> { - InputStream inputStream; - try { - inputStream = new FileInputStream(file); - LOG.info("Reading pipeline configuration from {}", file.getName()); - } catch (FileNotFoundException e) { - inputStream = null; - LOG.warn("Pipeline configuration file {} not found", file.getName()); - } - return inputStream; - }) + List pipelinesDataFlowModels = Stream.of(configurationLocation.listFiles(yamlFilter)) + .map(this::parsePipelineConfigurationFile) .filter(Objects::nonNull) .collect(Collectors.toList()); - if (configurationFiles.isEmpty()) { + if (pipelinesDataFlowModels.isEmpty()) { LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); throw new ParseException( format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); } - return new SequenceInputStream(Collections.enumeration(configurationFiles)); + return pipelinesDataFlowModels; } else { LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); throw new ParseException(format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); } } + + private PipelinesDataFlowModel parsePipelineConfigurationFile(final File pipelineConfigurationFile) { + try (final InputStream pipelineConfigurationInputStream = new FileInputStream(pipelineConfigurationFile)) { + LOG.info("Reading pipeline configuration from {}", pipelineConfigurationFile.getName()); + final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(pipelineConfigurationInputStream, + PipelinesDataFlowModel.class); + + final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion(); + validateDataPrepperVersion(version); + + return pipelinesDataFlowModel; + } catch (IOException e) { + if (e instanceof FileNotFoundException) { + LOG.warn("Pipeline configuration file {} not found", pipelineConfigurationFile.getName()); + return null; + } + LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation); + throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e); + } + } + + private PipelinesDataFlowModel mergePipelinesDataModels( + final List pipelinesDataFlowModels) { + final Map pipelinesDataFlowModelMap = pipelinesDataFlowModels.stream() + .map(PipelinesDataFlowModel::getPipelines) + .flatMap(pipelines -> pipelines.entrySet().stream()) + .collect(Collectors.toUnmodifiableMap( + Map.Entry::getKey, + Map.Entry::getValue + )); + return new PipelinesDataFlowModel(pipelinesDataFlowModelMap); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java index aaa79eba34..a5a13c0753 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java @@ -6,6 +6,7 @@ import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -46,6 +47,7 @@ void parseConfiguration_from_directory_with_multiple_files_creates_the_correct_m final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); + assertThat(actualPipelinesDataFlowModel.getDataPrepperVersion(), nullValue()); } @Test