diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 925246b8038b..a0d830bf0b0f 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -64,7 +64,6 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/plugin" "github.com/elastic/beats/v7/libbeat/pprof" - "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" @@ -840,10 +839,6 @@ func (b *Beat) configure(settings Settings) error { if err != nil { return err } - err = checkAgentDiskQueue(&b.Config) - if err != nil { - return err - } if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil { return err @@ -1508,31 +1503,24 @@ func promoteOutputQueueSettings(bc *beatConfig) error { func (bc *beatConfig) Validate() error { if bc.Output.IsSet() && bc.Output.Config().Enabled() { - pc := pipeline.Config{} - err := bc.Output.Config().Unpack(&pc) + outputPC := pipeline.Config{} + err := bc.Output.Config().Unpack(&outputPC) if err != nil { return fmt.Errorf("error unpacking output queue settings: %w", err) } - if bc.Pipeline.Queue.IsSet() && pc.Queue.IsSet() { + if bc.Pipeline.Queue.IsSet() && outputPC.Queue.IsSet() { return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed") } + //elastic-agent doesn't support disk queue yet + if bc.Management.Enabled() && outputPC.Queue.Config().Enabled() && outputPC.Queue.Name() == diskqueue.QueueType { + return fmt.Errorf("disk queue is not supported when management is enabled") + } } - return nil -} -// checkAgentDiskQueue should be run after management.NewManager() so -// that publisher.UnderAgent will be set with correct value -func checkAgentDiskQueue(bc *beatConfig) error { - //restriction is only if under agent - if !publisher.UnderAgent() { - return nil - } - //default queue settings are always allowed - if !bc.Pipeline.Queue.IsSet() { - return nil - } - if bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType { - return fmt.Errorf("disk queue is not supported under elastic-agent") + //elastic-agent doesn't support disk queue yet + if bc.Management.Enabled() && bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType { + return fmt.Errorf("disk queue is not supported when management is enabled") } + return nil } diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index 4c8443dfb72b..6f94d8fd62d2 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -310,6 +310,46 @@ output: events: 8096 `), memEvents: 8096}, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + cfg, err := yaml.NewConfig(tc.input) + require.NoError(t, err) + + config := beatConfig{} + err = cfg.Unpack(&config) + if tc.expectValidationError { + require.Error(t, err) + return + } + require.NoError(t, err) + + err = promoteOutputQueueSettings(&config) + require.NoError(t, err) + + ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config()) + require.NoError(t, err) + require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config()) + }) + } +} + +func TestValidateBeatConfig(t *testing.T) { + tests := map[string]struct { + input []byte + expectValidationError string + }{ + "blank": {input: []byte(""), + expectValidationError: "", + }, + "defaults": {input: []byte(` +name: mockbeat +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + expectValidationError: ""}, "topAndOutputLevelQueue": {input: []byte(` name: mockbeat queue: @@ -323,27 +363,59 @@ output: mem: events: 8096 `), - expectValidationError: true}, + expectValidationError: "top level queue and output level queue settings defined, only one is allowed accessing config"}, + "managementTopLevelDiskQueue": {input: []byte(` +name: mockbeat +management: + enabled: true +queue: + disk: + max_size: 1G +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + expectValidationError: "disk queue is not supported when management is enabled accessing config"}, + "managementOutputLevelDiskQueue": {input: []byte(` +name: mockbeat +management: + enabled: true +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + disk: + max_size: 1G +`), + expectValidationError: "disk queue is not supported when management is enabled accessing config"}, + "managementFalseOutputLevelDiskQueue": {input: []byte(` +name: mockbeat +management: + enabled: false +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + disk: + max_size: 1G +`), + expectValidationError: ""}, } for name, tc := range tests { t.Run(name, func(t *testing.T) { cfg, err := yaml.NewConfig(tc.input) require.NoError(t, err) - config := beatConfig{} err = cfg.Unpack(&config) - if tc.expectValidationError { + if tc.expectValidationError != "" { require.Error(t, err) - return + require.Equal(t, tc.expectValidationError, err.Error()) + } else { + require.NoError(t, err) } - require.NoError(t, err) - - err = promoteOutputQueueSettings(&config) - require.NoError(t, err) - - ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config()) - require.NoError(t, err) - require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config()) }) } }