Skip to content

Commit

Permalink
catch elastic-agent and disk queue at validation
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman committed Sep 29, 2023
1 parent 7e48d6f commit 476780e
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 35 deletions.
34 changes: 11 additions & 23 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/plugin"
"github.com/elastic/beats/v7/libbeat/pprof"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue"
Expand Down Expand Up @@ -830,10 +829,6 @@ func (b *Beat) configure(settings Settings) error {
if err != nil {
return err
}
err = checkAgentDiskQueue(&b.Config)
if err != nil {
return err
}

if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil {
return err
Expand Down Expand Up @@ -1494,31 +1489,24 @@ func promoteOutputQueueSettings(bc *beatConfig) error {

func (bc *beatConfig) Validate() error {
if bc.Output.IsSet() && bc.Output.Config().Enabled() {
pc := pipeline.Config{}
err := bc.Output.Config().Unpack(&pc)
outputPC := pipeline.Config{}
err := bc.Output.Config().Unpack(&outputPC)
if err != nil {
return fmt.Errorf("error unpacking output queue settings: %w", err)
}
if bc.Pipeline.Queue.IsSet() && pc.Queue.IsSet() {
if bc.Pipeline.Queue.IsSet() && outputPC.Queue.IsSet() {
return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed")
}
//elastic-agent doesn't support disk queue yet
if bc.Management.Enabled() && outputPC.Queue.Config().Enabled() && outputPC.Queue.Name() == diskqueue.QueueType {
return fmt.Errorf("disk queue is not supported when management is enabled")
}
}
return nil
}

// checkAgentDiskQueue should be run after management.NewManager() so
// that publisher.UnderAgent will be set with correct value
func checkAgentDiskQueue(bc *beatConfig) error {
//restriction is only if under agent
if !publisher.UnderAgent() {
return nil
}
//default queue settings are always allowed
if !bc.Pipeline.Queue.IsSet() {
return nil
}
if bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType {
return fmt.Errorf("disk queue is not supported under elastic-agent")
//elastic-agent doesn't support disk queue yet
if bc.Management.Enabled() && bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType {
return fmt.Errorf("disk queue is not supported when management is enabled")
}

return nil
}
96 changes: 84 additions & 12 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,46 @@ output:
events: 8096
`),
memEvents: 8096},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
cfg, err := yaml.NewConfig(tc.input)
require.NoError(t, err)

config := beatConfig{}
err = cfg.Unpack(&config)
if tc.expectValidationError {
require.Error(t, err)
return
}
require.NoError(t, err)

err = promoteOutputQueueSettings(&config)
require.NoError(t, err)

ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config())
require.NoError(t, err)
require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config())
})
}
}

func TestValidateBeatConfig(t *testing.T) {
tests := map[string]struct {
input []byte
expectValidationError string
}{
"blank": {input: []byte(""),
expectValidationError: "",
},
"defaults": {input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
expectValidationError: ""},
"topAndOutputLevelQueue": {input: []byte(`
name: mockbeat
queue:
Expand All @@ -323,27 +363,59 @@ output:
mem:
events: 8096
`),
expectValidationError: true},
expectValidationError: "top level queue and output level queue settings defined, only one is allowed accessing config"},
"managementTopLevelDiskQueue": {input: []byte(`
name: mockbeat
management:
enabled: true
queue:
disk:
max_size: 1G
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
expectValidationError: "disk queue is not supported when management is enabled accessing config"},
"managementOutputLevelDiskQueue": {input: []byte(`
name: mockbeat
management:
enabled: true
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
disk:
max_size: 1G
`),
expectValidationError: "disk queue is not supported when management is enabled accessing config"},
"managementFalseOutputLevelDiskQueue": {input: []byte(`
name: mockbeat
management:
enabled: false
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
disk:
max_size: 1G
`),
expectValidationError: ""},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
cfg, err := yaml.NewConfig(tc.input)
require.NoError(t, err)

config := beatConfig{}
err = cfg.Unpack(&config)
if tc.expectValidationError {
if tc.expectValidationError != "" {
require.Error(t, err)
return
require.Equal(t, tc.expectValidationError, err.Error())
} else {
require.NoError(t, err)
}
require.NoError(t, err)

err = promoteOutputQueueSettings(&config)
require.NoError(t, err)

ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config())
require.NoError(t, err)
require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config())
})
}
}

0 comments on commit 476780e

Please sign in to comment.