Skip to content

Commit

Permalink
add config validation
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman committed Sep 28, 2023
1 parent 9222b55 commit b2d5c98
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
21 changes: 18 additions & 3 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,8 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error unpacking config data: %w", err)
}

if err := mergeOutputQueueSettings(&b.Config); err != nil {
return fmt.Errorf("could not merge output queue settings: %w", err)
if err := promoteOutputQueueSettings(&b.Config); err != nil {
return fmt.Errorf("could not promote output queue settings: %w", err)
}

if err := features.UpdateFromConfig(b.RawConfig); err != nil {
Expand Down Expand Up @@ -1471,16 +1471,31 @@ func sanitizeIPs(ips []string) []string {
return validIPs
}

func mergeOutputQueueSettings(bc *beatConfig) error {
func promoteOutputQueueSettings(bc *beatConfig) error {
if bc.Output.IsSet() && bc.Output.Config().Enabled() {
pc := pipeline.Config{}
err := bc.Output.Config().Unpack(&pc)
if err != nil {
return fmt.Errorf("error unpacking output queue settings: %w", err)
}
if pc.Queue.IsSet() {
logp.Info("global queue settings replaced with output queue settings")
bc.Pipeline.Queue = pc.Queue
}
}
return nil
}

func (bc *beatConfig) Validate() error {
if bc.Output.IsSet() && bc.Output.Config().Enabled() {
pc := pipeline.Config{}
err := bc.Output.Config().Unpack(&pc)
if err != nil {
return fmt.Errorf("error unpacking output queue settings: %w", err)
}
if bc.Pipeline.Queue.IsSet() && pc.Queue.IsSet() {
return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed")
}
}
return nil
}
15 changes: 10 additions & 5 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,11 @@ func (r *outputReloaderMock) Reload(
return nil
}

func TestMergeOutputQueueSettings(t *testing.T) {
func TestPromoteOutputQueueSettings(t *testing.T) {
tests := map[string]struct {
input []byte
memEvents int
input []byte
memEvents int
expectValidationError bool
}{
"blank": {input: []byte(""),
memEvents: 4096},
Expand Down Expand Up @@ -322,7 +323,7 @@ output:
mem:
events: 8096
`),
memEvents: 8096},
expectValidationError: true},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand All @@ -331,9 +332,13 @@ output:

config := beatConfig{}
err = cfg.Unpack(&config)
if tc.expectValidationError {
require.Error(t, err)
return
}
require.NoError(t, err)

err = mergeOutputQueueSettings(&config)
err = promoteOutputQueueSettings(&config)
require.NoError(t, err)

ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config())
Expand Down

0 comments on commit b2d5c98

Please sign in to comment.