Skip to content

Commit

Permalink
add support for queue settings under outputs (#36788)
Browse files Browse the repository at this point in the history
* add support for queue settings under outputs
  • Loading branch information
leehinman authored Oct 19, 2023
1 parent e39b37e commit 0bd2d73
Show file tree
Hide file tree
Showing 27 changed files with 383 additions and 125 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ is collected by it.
- Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322]
- [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506]
Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor
- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36788[36788]
- Beats will now connect to older Elasticsearch instances by default {pull}36884[36884]

*Auditbeat*
Expand Down Expand Up @@ -216,8 +217,7 @@ is collected by it.
- Added support for Okta OAuth2 provider in the httpjson input. {pull}36273[36273]
- Add support of the interval parameter in Salesforce setupaudittrail-rest fileset. {issue}35917[35917] {pull}35938[35938]
- Add device handling to Okta input package for entity analytics. {pull}36049[36049]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}36286[36286]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30916[30916] {pull}36286[36286]
- [Azure] Add input metrics to the azure-eventhub input. {pull}35739[35739]
- Reduce HTTPJSON metrics allocations. {pull}36282[36282]
- Add support for a simplified input configuraton when running under Elastic-Agent {pull}36390[36390]
Expand Down
49 changes: 49 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/elastic/beats/v7/libbeat/pprof"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/file"
Expand Down Expand Up @@ -783,6 +784,10 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error unpacking config data: %w", err)
}

if err := promoteOutputQueueSettings(&b.Config); err != nil {
return fmt.Errorf("could not promote output queue settings: %w", err)
}

if err := features.UpdateFromConfig(b.RawConfig); err != nil {
return fmt.Errorf("could not parse features: %w", err)
}
Expand Down Expand Up @@ -1482,3 +1487,47 @@ func sanitizeIPs(ips []string) []string {
}
return validIPs
}

// promoteOutputQueueSettings checks to see if the output
// configuration has queue settings defined and if so it promotes them
// to the top level queue settings. This is done to allow existing
// behavior of specifying queue settings at the top level or like
// elastic-agent that specifies queue settings under the output
func promoteOutputQueueSettings(bc *beatConfig) error {
if bc.Output.IsSet() && bc.Output.Config().Enabled() {
pc := pipeline.Config{}
err := bc.Output.Config().Unpack(&pc)
if err != nil {
return fmt.Errorf("error unpacking output queue settings: %w", err)
}
if pc.Queue.IsSet() {
logp.Info("global queue settings replaced with output queue settings")
bc.Pipeline.Queue = pc.Queue
}
}
return nil
}

func (bc *beatConfig) Validate() error {
if bc.Output.IsSet() && bc.Output.Config().Enabled() {
outputPC := pipeline.Config{}
err := bc.Output.Config().Unpack(&outputPC)
if err != nil {
return fmt.Errorf("error unpacking output queue settings: %w", err)
}
if bc.Pipeline.Queue.IsSet() && outputPC.Queue.IsSet() {
return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed")
}
//elastic-agent doesn't support disk queue yet
if bc.Management.Enabled() && outputPC.Queue.Config().Enabled() && outputPC.Queue.Name() == diskqueue.QueueType {
return fmt.Errorf("disk queue is not supported when management is enabled")
}
}

//elastic-agent doesn't support disk queue yet
if bc.Management.Enabled() && bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType {
return fmt.Errorf("disk queue is not supported when management is enabled")
}

return nil
}
164 changes: 164 additions & 0 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/go-ucfg/yaml"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -269,3 +271,165 @@ func (r *outputReloaderMock) Reload(
r.cfg = cfg
return nil
}

func TestPromoteOutputQueueSettings(t *testing.T) {
tests := map[string]struct {
input []byte
memEvents int
}{
"blank": {
input: []byte(""),
memEvents: 4096,
},
"defaults": {
input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
memEvents: 4096,
},
"topLevelQueue": {
input: []byte(`
name: mockbeat
queue:
mem:
events: 8096
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
memEvents: 8096,
},
"outputLevelQueue": {
input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
mem:
events: 8096
`),
memEvents: 8096,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
cfg, err := yaml.NewConfig(tc.input)
require.NoError(t, err)

config := beatConfig{}
err = cfg.Unpack(&config)
require.NoError(t, err)

err = promoteOutputQueueSettings(&config)
require.NoError(t, err)

ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config())
require.NoError(t, err)
require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config())
})
}
}

func TestValidateBeatConfig(t *testing.T) {
tests := map[string]struct {
input []byte
expectValidationError string
}{
"blank": {
input: []byte(""),
expectValidationError: "",
},
"defaults": {
input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
expectValidationError: "",
},
"topAndOutputLevelQueue": {
input: []byte(`
name: mockbeat
queue:
mem:
events: 2048
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
mem:
events: 8096
`),
expectValidationError: "top level queue and output level queue settings defined, only one is allowed accessing config",
},
"managementTopLevelDiskQueue": {
input: []byte(`
name: mockbeat
management:
enabled: true
queue:
disk:
max_size: 1G
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
expectValidationError: "disk queue is not supported when management is enabled accessing config",
},
"managementOutputLevelDiskQueue": {
input: []byte(`
name: mockbeat
management:
enabled: true
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
disk:
max_size: 1G
`),
expectValidationError: "disk queue is not supported when management is enabled accessing config",
},
"managementFalseOutputLevelDiskQueue": {
input: []byte(`
name: mockbeat
management:
enabled: false
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
disk:
max_size: 1G
`),
expectValidationError: "",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
cfg, err := yaml.NewConfig(tc.input)
require.NoError(t, err)
config := beatConfig{}
err = cfg.Unpack(&config)
if tc.expectValidationError != "" {
require.Error(t, err)
require.Equal(t, tc.expectValidationError, err.Error())
} else {
require.NoError(t, err)
}
})
}
}
8 changes: 4 additions & 4 deletions libbeat/docs/queueconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ queue is responsible for buffering and combining events into batches that can
be consumed by the outputs. The outputs will use bulk operations to send a
batch of events in one transaction.

You can configure the type and behavior of the internal queue by setting
options in the `queue` section of the +{beatname_lc}.yml+ config file. Only one
queue type can be configured.

You can configure the type and behavior of the internal queue by
setting options in the `queue` section of the +{beatname_lc}.yml+
config file or by setting options in the `queue` section of the
output. Only one queue type can be configured.

This sample configuration sets the memory queue to buffer up to 4096 events:

Expand Down
6 changes: 5 additions & 1 deletion libbeat/outputs/console/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package console

import "github.com/elastic/beats/v7/libbeat/outputs/codec"
import (
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
)

type Config struct {
Codec codec.Config `config:"codec"`
Expand All @@ -26,6 +29,7 @@ type Config struct {
Pretty bool `config:"pretty"`

BatchSize int
Queue config.Namespace `config:"queue"`
}

var defaultConfig = Config{}
14 changes: 3 additions & 11 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"os"
"runtime"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand All @@ -43,13 +42,6 @@ type console struct {
index string
}

type consoleEvent struct {
Timestamp time.Time `json:"@timestamp" struct:"@timestamp"`

// Note: stdlib json doesn't support inlining :( -> use `codec: 2`, to generate proper event
Fields interface{} `struct:",inline"`
}

func init() {
outputs.RegisterType("console", makeConsole)
}
Expand Down Expand Up @@ -82,18 +74,18 @@ func makeConsole(
index := beat.Beat
c, err := newConsole(index, observer, enc)
if err != nil {
return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err))
return outputs.Fail(fmt.Errorf("console output initialization failed with: %w", err))
}

// check stdout actually being available
if runtime.GOOS != "windows" {
if _, err = c.out.Stat(); err != nil {
err = fmt.Errorf("console output initialization failed with: %v", err)
err = fmt.Errorf("console output initialization failed with: %w", err)
return outputs.Fail(err)
}
}

return outputs.Success(config.BatchSize, 0, c)
return outputs.Success(config.Queue, config.BatchSize, 0, c)
}

func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) {
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type elasticsearchConfig struct {
AllowOlderVersion bool `config:"allow_older_versions"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
Queue config.Namespace `config:"queue"`
}

type Backoff struct {
Expand Down
Loading

0 comments on commit 0bd2d73

Please sign in to comment.