Skip to content

Commit

Permalink
Keep all fields of pipelines when adding reroute rules (#1385)
Browse files Browse the repository at this point in the history
Avoid modifying pipelines when unmarshalling them to add reroute rules. For that,
unmarshal them to a generic map, instead of doing it to a struct with limited fields.
  • Loading branch information
jsoriano authored Aug 9, 2023
1 parent 1586c5d commit 152bbac
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 30 deletions.
77 changes: 49 additions & 28 deletions internal/elasticsearch/ingest/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bytes"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -39,12 +38,6 @@ type RoutingRule struct {
Rules []Rule `yaml:"rules"`
}

type ESIngestPipeline struct {
Description string `yaml:"description"`
Processors []map[string]interface{} `yaml:"processors"`
AdditionalFields map[string]interface{} `yaml:",inline"`
}

type RerouteProcessor struct {
Tag string `yaml:"tag"`
If string `yaml:"if"`
Expand Down Expand Up @@ -95,37 +88,21 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
c = ingestPipelineTag.ReplaceAllFunc(c, func(found []byte) []byte {
s := strings.Split(string(found), `"`)
if len(s) != 3 {
log.Fatalf("invalid IngestPipeline tag in template (path: %s)", path)
err = fmt.Errorf("invalid IngestPipeline tag in template (path: %s)", path)
return nil
}
pipelineTag := s[1]
return []byte(getPipelineNameWithNonce(pipelineTag, nonce))
})

// Unmarshal the YAML data into a ESIngestPipeline struct
var esPipeline ESIngestPipeline
err = yaml.Unmarshal(c, &esPipeline)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal ingest pipeline YAML data (path: %s): %w", path, err)
return nil, err
}

// read routing_rules.yml and convert it into reroute processors in ingest pipeline
rerouteProcessors, err := loadRoutingRuleFile(dataStreamPath)
c, err = addRerouteProcessors(c, dataStreamPath, path)
if err != nil {
log.Fatalf("failed loading routing_rules.yml: %v", err)
return nil, err
}

// only attach routing_rules.yml reroute processors after the default pipeline
filename := filepath.Base(path)
if filename == defaultPipelineJSON || filename == defaultPipelineYML {
esPipeline.Processors = append(esPipeline.Processors, rerouteProcessors...)
}

c, err = yaml.Marshal(esPipeline)
if err != nil {
log.Fatalf("Failed to marshal modified ingest pipeline YAML data: %v", err)
}

// put routing rules into processors
name := filepath.Base(path)
pipelines = append(pipelines, Pipeline{
Path: path,
Expand All @@ -137,6 +114,50 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
return pipelines, nil
}

func addRerouteProcessors(pipeline []byte, dataStreamPath, path string) ([]byte, error) {
// Only attach routing_rules.yml reroute processors after the default pipeline
filename := filepath.Base(path)
if filename != defaultPipelineJSON && filename != defaultPipelineYML {
return pipeline, nil
}

// Read routing_rules.yml and convert it into reroute processors in ingest pipeline
rerouteProcessors, err := loadRoutingRuleFile(dataStreamPath)
if err != nil {
return nil, fmt.Errorf("failed loading routing rules: %v", err)
}
if len(rerouteProcessors) == 0 {
return pipeline, nil
}

var yamlPipeline map[string]any
err = yaml.Unmarshal(pipeline, &yamlPipeline)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal ingest pipeline YAML data (path: %s): %w", path, err)
}

var processors []any
v, found := yamlPipeline["processors"]
if found {
list, ok := v.([]any)
if !ok {
return nil, fmt.Errorf("unexpected processors type, expected []any, found %T", v)
}
processors = list
}
for _, p := range rerouteProcessors {
processors = append(processors, p)
}
yamlPipeline["processors"] = processors

pipeline, err = yaml.Marshal(yamlPipeline)
if err != nil {
return nil, fmt.Errorf("failed to marshal modified ingest pipeline YAML data: %v", err)
}

return pipeline, nil
}

func loadRoutingRuleFile(dataStreamPath string) ([]map[string]interface{}, error) {
routingRulePath := filepath.Join(dataStreamPath, "routing_rules.yml")
c, err := os.ReadFile(routingRulePath)
Expand Down
9 changes: 7 additions & 2 deletions internal/testrunner/runners/system/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,12 +671,17 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext
// when reroute processors are used, expectedDatasets should be set depends on the processor config
var expectedDatasets []string
for _, pipeline := range r.pipelines {
var esIngestPipeline ingest.ESIngestPipeline
var esIngestPipeline map[string]any
err = yaml.Unmarshal(pipeline.Content, &esIngestPipeline)
if err != nil {
return nil, fmt.Errorf("unmarshalling ingest pipeline content failed: %w", err)
}
for _, processor := range esIngestPipeline.Processors {
processors, _ := esIngestPipeline["processors"].([]any)
for _, p := range processors {
processor, ok := p.(map[string]any)
if !ok {
return nil, fmt.Errorf("unexpected processor %+v", p)
}
if reroute, ok := processor["reroute"]; ok {
if rerouteP, ok := reroute.(ingest.RerouteProcessor); ok {
expectedDatasets = append(expectedDatasets, rerouteP.Dataset...)
Expand Down

0 comments on commit 152bbac

Please sign in to comment.