diff --git a/internal/elasticsearch/ingest/datastream.go b/internal/elasticsearch/ingest/datastream.go index 1f73515b0..6b56d7e37 100644 --- a/internal/elasticsearch/ingest/datastream.go +++ b/internal/elasticsearch/ingest/datastream.go @@ -8,7 +8,6 @@ import ( "bytes" "fmt" "io" - "log" "net/http" "os" "path/filepath" @@ -39,12 +38,6 @@ type RoutingRule struct { Rules []Rule `yaml:"rules"` } -type ESIngestPipeline struct { - Description string `yaml:"description"` - Processors []map[string]interface{} `yaml:"processors"` - AdditionalFields map[string]interface{} `yaml:",inline"` -} - type RerouteProcessor struct { Tag string `yaml:"tag"` If string `yaml:"if"` @@ -95,37 +88,21 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er c = ingestPipelineTag.ReplaceAllFunc(c, func(found []byte) []byte { s := strings.Split(string(found), `"`) if len(s) != 3 { - log.Fatalf("invalid IngestPipeline tag in template (path: %s)", path) + err = fmt.Errorf("invalid IngestPipeline tag in template (path: %s)", path) + return nil } pipelineTag := s[1] return []byte(getPipelineNameWithNonce(pipelineTag, nonce)) }) - - // Unmarshal the YAML data into a ESIngestPipeline struct - var esPipeline ESIngestPipeline - err = yaml.Unmarshal(c, &esPipeline) if err != nil { - return nil, fmt.Errorf("failed to unmarshal ingest pipeline YAML data (path: %s): %w", path, err) + return nil, err } - // read routing_rules.yml and convert it into reroute processors in ingest pipeline - rerouteProcessors, err := loadRoutingRuleFile(dataStreamPath) + c, err = addRerouteProcessors(c, dataStreamPath, path) if err != nil { - log.Fatalf("failed loading routing_rules.yml: %v", err) + return nil, err } - // only attach routing_rules.yml reroute processors after the default pipeline - filename := filepath.Base(path) - if filename == defaultPipelineJSON || filename == defaultPipelineYML { - esPipeline.Processors = append(esPipeline.Processors, rerouteProcessors...) - } - - c, err = yaml.Marshal(esPipeline) - if err != nil { - log.Fatalf("Failed to marshal modified ingest pipeline YAML data: %v", err) - } - - // put routing rules into processors name := filepath.Base(path) pipelines = append(pipelines, Pipeline{ Path: path, @@ -137,6 +114,50 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er return pipelines, nil } +func addRerouteProcessors(pipeline []byte, dataStreamPath, path string) ([]byte, error) { + // Only attach routing_rules.yml reroute processors after the default pipeline + filename := filepath.Base(path) + if filename != defaultPipelineJSON && filename != defaultPipelineYML { + return pipeline, nil + } + + // Read routing_rules.yml and convert it into reroute processors in ingest pipeline + rerouteProcessors, err := loadRoutingRuleFile(dataStreamPath) + if err != nil { + return nil, fmt.Errorf("failed loading routing rules: %v", err) + } + if len(rerouteProcessors) == 0 { + return pipeline, nil + } + + var yamlPipeline map[string]any + err = yaml.Unmarshal(pipeline, &yamlPipeline) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal ingest pipeline YAML data (path: %s): %w", path, err) + } + + var processors []any + v, found := yamlPipeline["processors"] + if found { + list, ok := v.([]any) + if !ok { + return nil, fmt.Errorf("unexpected processors type, expected []any, found %T", v) + } + processors = list + } + for _, p := range rerouteProcessors { + processors = append(processors, p) + } + yamlPipeline["processors"] = processors + + pipeline, err = yaml.Marshal(yamlPipeline) + if err != nil { + return nil, fmt.Errorf("failed to marshal modified ingest pipeline YAML data: %v", err) + } + + return pipeline, nil +} + func loadRoutingRuleFile(dataStreamPath string) ([]map[string]interface{}, error) { routingRulePath := filepath.Join(dataStreamPath, "routing_rules.yml") c, err := os.ReadFile(routingRulePath) diff --git a/internal/testrunner/runners/system/runner.go b/internal/testrunner/runners/system/runner.go index d58b2a12a..ab5359f7a 100644 --- a/internal/testrunner/runners/system/runner.go +++ b/internal/testrunner/runners/system/runner.go @@ -671,12 +671,17 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext // when reroute processors are used, expectedDatasets should be set depends on the processor config var expectedDatasets []string for _, pipeline := range r.pipelines { - var esIngestPipeline ingest.ESIngestPipeline + var esIngestPipeline map[string]any err = yaml.Unmarshal(pipeline.Content, &esIngestPipeline) if err != nil { return nil, fmt.Errorf("unmarshalling ingest pipeline content failed: %w", err) } - for _, processor := range esIngestPipeline.Processors { + processors, _ := esIngestPipeline["processors"].([]any) + for _, p := range processors { + processor, ok := p.(map[string]any) + if !ok { + return nil, fmt.Errorf("unexpected processor %+v", p) + } if reroute, ok := processor["reroute"]; ok { if rerouteP, ok := reroute.(ingest.RerouteProcessor); ok { expectedDatasets = append(expectedDatasets, rerouteP.Dataset...)