From 0bd2d7392b3e1d2cf3340f67c5f390c74857819d Mon Sep 17 00:00:00 2001 From: Lee E Hinman <57081003+leehinman@users.noreply.github.com> Date: Thu, 19 Oct 2023 18:04:30 -0500 Subject: [PATCH] add support for queue settings under outputs (#36788) * add support for queue settings under outputs --- CHANGELOG.next.asciidoc | 4 +- libbeat/cmd/instance/beat.go | 49 ++++++ libbeat/cmd/instance/beat_test.go | 164 ++++++++++++++++++ libbeat/docs/queueconfig.asciidoc | 8 +- libbeat/outputs/console/config.go | 6 +- libbeat/outputs/console/console.go | 14 +- libbeat/outputs/elasticsearch/config.go | 1 + .../outputs/elasticsearch/elasticsearch.go | 34 ++-- libbeat/outputs/fileout/config.go | 24 +-- libbeat/outputs/fileout/file.go | 10 +- libbeat/outputs/kafka/config.go | 7 +- libbeat/outputs/kafka/kafka.go | 12 +- libbeat/outputs/logstash/config.go | 1 + libbeat/outputs/logstash/logstash.go | 18 +- libbeat/outputs/redis/config.go | 2 + libbeat/outputs/redis/redis.go | 34 ++-- libbeat/outputs/util.go | 57 +++++- libbeat/publisher/pipeline/client_test.go | 2 +- libbeat/publisher/pipeline/controller.go | 13 +- libbeat/publisher/pipeline/controller_test.go | 2 +- libbeat/publisher/pipeline/pipeline.go | 10 +- libbeat/publisher/pipeline/stress/out.go | 13 +- libbeat/publisher/queue/diskqueue/queue.go | 1 + libbeat/publisher/queue/memqueue/broker.go | 7 +- .../publisher/queue/memqueue/queue_test.go | 12 +- libbeat/publisher/queue/proxy/broker.go | 1 + libbeat/publisher/queue/queue.go | 2 +- 27 files changed, 383 insertions(+), 125 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index be6b54838e7..7c35197567b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -166,6 +166,7 @@ is collected by it. - Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322] - [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506] Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor +- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36788[36788] - Beats will now connect to older Elasticsearch instances by default {pull}36884[36884] *Auditbeat* @@ -216,8 +217,7 @@ is collected by it. - Added support for Okta OAuth2 provider in the httpjson input. {pull}36273[36273] - Add support of the interval parameter in Salesforce setupaudittrail-rest fileset. {issue}35917[35917] {pull}35938[35938] - Add device handling to Okta input package for entity analytics. {pull}36049[36049] -- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999] -- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}36286[36286] +- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30916[30916] {pull}36286[36286] - [Azure] Add input metrics to the azure-eventhub input. {pull}35739[35739] - Reduce HTTPJSON metrics allocations. {pull}36282[36282] - Add support for a simplified input configuraton when running under Elastic-Agent {pull}36390[36390] diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index e6596106b83..efe8bd48f79 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -66,6 +66,7 @@ import ( "github.com/elastic/beats/v7/libbeat/pprof" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/processing" + "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/file" @@ -783,6 +784,10 @@ func (b *Beat) configure(settings Settings) error { return fmt.Errorf("error unpacking config data: %w", err) } + if err := promoteOutputQueueSettings(&b.Config); err != nil { + return fmt.Errorf("could not promote output queue settings: %w", err) + } + if err := features.UpdateFromConfig(b.RawConfig); err != nil { return fmt.Errorf("could not parse features: %w", err) } @@ -1482,3 +1487,47 @@ func sanitizeIPs(ips []string) []string { } return validIPs } + +// promoteOutputQueueSettings checks to see if the output +// configuration has queue settings defined and if so it promotes them +// to the top level queue settings. This is done to allow existing +// behavior of specifying queue settings at the top level or like +// elastic-agent that specifies queue settings under the output +func promoteOutputQueueSettings(bc *beatConfig) error { + if bc.Output.IsSet() && bc.Output.Config().Enabled() { + pc := pipeline.Config{} + err := bc.Output.Config().Unpack(&pc) + if err != nil { + return fmt.Errorf("error unpacking output queue settings: %w", err) + } + if pc.Queue.IsSet() { + logp.Info("global queue settings replaced with output queue settings") + bc.Pipeline.Queue = pc.Queue + } + } + return nil +} + +func (bc *beatConfig) Validate() error { + if bc.Output.IsSet() && bc.Output.Config().Enabled() { + outputPC := pipeline.Config{} + err := bc.Output.Config().Unpack(&outputPC) + if err != nil { + return fmt.Errorf("error unpacking output queue settings: %w", err) + } + if bc.Pipeline.Queue.IsSet() && outputPC.Queue.IsSet() { + return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed") + } + //elastic-agent doesn't support disk queue yet + if bc.Management.Enabled() && outputPC.Queue.Config().Enabled() && outputPC.Queue.Name() == diskqueue.QueueType { + return fmt.Errorf("disk queue is not supported when management is enabled") + } + } + + //elastic-agent doesn't support disk queue yet + if bc.Management.Enabled() && bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType { + return fmt.Errorf("disk queue is not supported when management is enabled") + } + + return nil +} diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index fc8c88a4915..03474ecfcd9 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -27,7 +27,9 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/go-ucfg/yaml" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" @@ -269,3 +271,165 @@ func (r *outputReloaderMock) Reload( r.cfg = cfg return nil } + +func TestPromoteOutputQueueSettings(t *testing.T) { + tests := map[string]struct { + input []byte + memEvents int + }{ + "blank": { + input: []byte(""), + memEvents: 4096, + }, + "defaults": { + input: []byte(` +name: mockbeat +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + memEvents: 4096, + }, + "topLevelQueue": { + input: []byte(` +name: mockbeat +queue: + mem: + events: 8096 +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + memEvents: 8096, + }, + "outputLevelQueue": { + input: []byte(` +name: mockbeat +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + mem: + events: 8096 +`), + memEvents: 8096, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + cfg, err := yaml.NewConfig(tc.input) + require.NoError(t, err) + + config := beatConfig{} + err = cfg.Unpack(&config) + require.NoError(t, err) + + err = promoteOutputQueueSettings(&config) + require.NoError(t, err) + + ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config()) + require.NoError(t, err) + require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config()) + }) + } +} + +func TestValidateBeatConfig(t *testing.T) { + tests := map[string]struct { + input []byte + expectValidationError string + }{ + "blank": { + input: []byte(""), + expectValidationError: "", + }, + "defaults": { + input: []byte(` +name: mockbeat +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + expectValidationError: "", + }, + "topAndOutputLevelQueue": { + input: []byte(` +name: mockbeat +queue: + mem: + events: 2048 +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + mem: + events: 8096 +`), + expectValidationError: "top level queue and output level queue settings defined, only one is allowed accessing config", + }, + "managementTopLevelDiskQueue": { + input: []byte(` +name: mockbeat +management: + enabled: true +queue: + disk: + max_size: 1G +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + expectValidationError: "disk queue is not supported when management is enabled accessing config", + }, + "managementOutputLevelDiskQueue": { + input: []byte(` +name: mockbeat +management: + enabled: true +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + disk: + max_size: 1G +`), + expectValidationError: "disk queue is not supported when management is enabled accessing config", + }, + "managementFalseOutputLevelDiskQueue": { + input: []byte(` +name: mockbeat +management: + enabled: false +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + disk: + max_size: 1G +`), + expectValidationError: "", + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + cfg, err := yaml.NewConfig(tc.input) + require.NoError(t, err) + config := beatConfig{} + err = cfg.Unpack(&config) + if tc.expectValidationError != "" { + require.Error(t, err) + require.Equal(t, tc.expectValidationError, err.Error()) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/libbeat/docs/queueconfig.asciidoc b/libbeat/docs/queueconfig.asciidoc index fb930831dac..ade3bd2ec8e 100644 --- a/libbeat/docs/queueconfig.asciidoc +++ b/libbeat/docs/queueconfig.asciidoc @@ -9,10 +9,10 @@ queue is responsible for buffering and combining events into batches that can be consumed by the outputs. The outputs will use bulk operations to send a batch of events in one transaction. -You can configure the type and behavior of the internal queue by setting -options in the `queue` section of the +{beatname_lc}.yml+ config file. Only one -queue type can be configured. - +You can configure the type and behavior of the internal queue by +setting options in the `queue` section of the +{beatname_lc}.yml+ +config file or by setting options in the `queue` section of the +output. Only one queue type can be configured. This sample configuration sets the memory queue to buffer up to 4096 events: diff --git a/libbeat/outputs/console/config.go b/libbeat/outputs/console/config.go index 44869e388fa..e0a1cc9ff28 100644 --- a/libbeat/outputs/console/config.go +++ b/libbeat/outputs/console/config.go @@ -17,7 +17,10 @@ package console -import "github.com/elastic/beats/v7/libbeat/outputs/codec" +import ( + "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/elastic-agent-libs/config" +) type Config struct { Codec codec.Config `config:"codec"` @@ -26,6 +29,7 @@ type Config struct { Pretty bool `config:"pretty"` BatchSize int + Queue config.Namespace `config:"queue"` } var defaultConfig = Config{} diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 905aa778998..b81bf336348 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -23,7 +23,6 @@ import ( "fmt" "os" "runtime" - "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" @@ -43,13 +42,6 @@ type console struct { index string } -type consoleEvent struct { - Timestamp time.Time `json:"@timestamp" struct:"@timestamp"` - - // Note: stdlib json doesn't support inlining :( -> use `codec: 2`, to generate proper event - Fields interface{} `struct:",inline"` -} - func init() { outputs.RegisterType("console", makeConsole) } @@ -82,18 +74,18 @@ func makeConsole( index := beat.Beat c, err := newConsole(index, observer, enc) if err != nil { - return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err)) + return outputs.Fail(fmt.Errorf("console output initialization failed with: %w", err)) } // check stdout actually being available if runtime.GOOS != "windows" { if _, err = c.out.Stat(); err != nil { - err = fmt.Errorf("console output initialization failed with: %v", err) + err = fmt.Errorf("console output initialization failed with: %w", err) return outputs.Fail(err) } } - return outputs.Success(config.BatchSize, 0, c) + return outputs.Success(config.Queue, config.BatchSize, 0, c) } func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) { diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index ca77a44b833..e504f2dc213 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -45,6 +45,7 @@ type elasticsearchConfig struct { AllowOlderVersion bool `config:"allow_older_versions"` Transport httpcommon.HTTPTransportSettings `config:",inline"` + Queue config.Namespace `config:"queue"` } type Backoff struct { diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 9cd33ea8d8a..f7e38853924 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -41,7 +41,7 @@ func makeES( ) (outputs.Group, error) { log := logp.NewLogger(logSelector) if !cfg.HasField("bulk_max_size") { - cfg.SetInt("bulk_max_size", -1, defaultBulkSize) + _ = cfg.SetInt("bulk_max_size", -1, defaultBulkSize) } index, pipeline, err := buildSelectors(im, beat, cfg) @@ -49,12 +49,12 @@ func makeES( return outputs.Fail(err) } - config := defaultConfig - if err := cfg.Unpack(&config); err != nil { + esConfig := defaultConfig + if err := cfg.Unpack(&esConfig); err != nil { return outputs.Fail(err) } - policy, err := newNonIndexablePolicy(config.NonIndexablePolicy) + policy, err := newNonIndexablePolicy(esConfig.NonIndexablePolicy) if err != nil { log.Errorf("error while creating file identifier: %v", err) return outputs.Fail(err) @@ -65,12 +65,12 @@ func makeES( return outputs.Fail(err) } - if proxyURL := config.Transport.Proxy.URL; proxyURL != nil && !config.Transport.Proxy.Disable { + if proxyURL := esConfig.Transport.Proxy.URL; proxyURL != nil && !esConfig.Transport.Proxy.Disable { log.Debugf("breaking down proxy URL. Scheme: '%s', host[:port]: '%s', path: '%s'", proxyURL.Scheme, proxyURL.Host, proxyURL.Path) log.Infof("Using proxy URL: %s", proxyURL) } - params := config.Params + params := esConfig.Params if len(params) == 0 { params = nil } @@ -84,7 +84,7 @@ func makeES( clients := make([]outputs.NetworkClient, len(hosts)) for i, host := range hosts { - esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) + esURL, err := common.MakeURL(esConfig.Protocol, esConfig.Path, host, 9200) if err != nil { log.Errorf("Invalid host param set: %s, Error: %+v", host, err) return outputs.Fail(err) @@ -95,16 +95,16 @@ func makeES( ConnectionSettings: eslegclient.ConnectionSettings{ URL: esURL, Beatname: beat.Beat, - Kerberos: config.Kerberos, - Username: config.Username, - Password: config.Password, - APIKey: config.APIKey, + Kerberos: esConfig.Kerberos, + Username: esConfig.Username, + Password: esConfig.Password, + APIKey: esConfig.APIKey, Parameters: params, - Headers: config.Headers, - CompressionLevel: config.CompressionLevel, + Headers: esConfig.Headers, + CompressionLevel: esConfig.CompressionLevel, Observer: observer, - EscapeHTML: config.EscapeHTML, - Transport: config.Transport, + EscapeHTML: esConfig.EscapeHTML, + Transport: esConfig.Transport, }, Index: index, Pipeline: pipeline, @@ -115,11 +115,11 @@ func makeES( return outputs.Fail(err) } - client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max) + client = outputs.WithBackoff(client, esConfig.Backoff.Init, esConfig.Backoff.Max) clients[i] = client } - return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients) + return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, clients) } func buildSelectors( diff --git a/libbeat/outputs/fileout/config.go b/libbeat/outputs/fileout/config.go index cfd28bfaaf2..e72a9f87d6f 100644 --- a/libbeat/outputs/fileout/config.go +++ b/libbeat/outputs/fileout/config.go @@ -21,21 +21,23 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/file" ) -type config struct { - Path string `config:"path"` - Filename string `config:"filename"` - RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"` - NumberOfFiles uint `config:"number_of_files"` - Codec codec.Config `config:"codec"` - Permissions uint32 `config:"permissions"` - RotateOnStartup bool `config:"rotate_on_startup"` +type fileOutConfig struct { + Path string `config:"path"` + Filename string `config:"filename"` + RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"` + NumberOfFiles uint `config:"number_of_files"` + Codec codec.Config `config:"codec"` + Permissions uint32 `config:"permissions"` + RotateOnStartup bool `config:"rotate_on_startup"` + Queue config.Namespace `config:"queue"` } -func defaultConfig() config { - return config{ +func defaultConfig() fileOutConfig { + return fileOutConfig{ NumberOfFiles: 7, RotateEveryKb: 10 * 1024, Permissions: 0600, @@ -43,7 +45,7 @@ func defaultConfig() config { } } -func (c *config) Validate() error { +func (c *fileOutConfig) Validate() error { if c.NumberOfFiles < 2 || c.NumberOfFiles > file.MaxBackupsLimit { return fmt.Errorf("the number_of_files to keep should be between 2 and %v", file.MaxBackupsLimit) diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 949d835f541..d12a11b25c3 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -51,8 +51,8 @@ func makeFileout( observer outputs.Observer, cfg *c.C, ) (outputs.Group, error) { - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { + foConfig := defaultConfig() + if err := cfg.Unpack(&foConfig); err != nil { return outputs.Fail(err) } @@ -64,14 +64,14 @@ func makeFileout( beat: beat, observer: observer, } - if err := fo.init(beat, config); err != nil { + if err := fo.init(beat, foConfig); err != nil { return outputs.Fail(err) } - return outputs.Success(-1, 0, fo) + return outputs.Success(foConfig.Queue, -1, 0, fo) } -func (out *fileOutput) init(beat beat.Info, c config) error { +func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error { var path string if c.Filename != "" { path = filepath.Join(c.Path, c.Filename) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 7247699500f..8fff8dad0d5 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -76,6 +76,7 @@ type kafkaConfig struct { Codec codec.Config `config:"codec"` Sasl kafka.SaslConfig `config:"sasl"` EnableFAST bool `config:"enable_krb5_fast"` + Queue config.Namespace `config:"queue"` } type metaConfig struct { @@ -101,12 +102,6 @@ var compressionModes = map[string]sarama.CompressionCodec{ "snappy": sarama.CompressionSnappy, } -const ( - saslTypePlaintext = sarama.SASLTypePlaintext - saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256 - saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512 -) - func defaultConfig() kafkaConfig { return kafkaConfig{ Hosts: nil, diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index ef1c253981f..0c856ea425d 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -47,7 +47,7 @@ func makeKafka( log := logp.NewLogger(logSelector) log.Debug("initialize kafka output") - config, err := readConfig(cfg) + kConfig, err := readConfig(cfg) if err != nil { return outputs.Fail(err) } @@ -57,7 +57,7 @@ func makeKafka( return outputs.Fail(err) } - libCfg, err := newSaramaConfig(log, config) + libCfg, err := newSaramaConfig(log, kConfig) if err != nil { return outputs.Fail(err) } @@ -67,21 +67,21 @@ func makeKafka( return outputs.Fail(err) } - codec, err := codec.CreateEncoder(beat, config.Codec) + codec, err := codec.CreateEncoder(beat, kConfig.Codec) if err != nil { return outputs.Fail(err) } - client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, config.Headers, codec, libCfg) + client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg) if err != nil { return outputs.Fail(err) } retry := 0 - if config.MaxRetries < 0 { + if kConfig.MaxRetries < 0 { retry = -1 } - return outputs.Success(config.BulkMaxSize, retry, client) + return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, client) } func buildTopicSelector(cfg *config.C) (outil.Selector, error) { diff --git a/libbeat/outputs/logstash/config.go b/libbeat/outputs/logstash/config.go index 82747fe01d0..9df57514495 100644 --- a/libbeat/outputs/logstash/config.go +++ b/libbeat/outputs/logstash/config.go @@ -43,6 +43,7 @@ type Config struct { Proxy transport.ProxyConfig `config:",inline"` Backoff Backoff `config:"backoff"` EscapeHTML bool `config:"escape_html"` + Queue config.Namespace `config:"queue"` } type Backoff struct { diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 5e7cdfeee7a..072ec049f6f 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -41,7 +41,7 @@ func makeLogstash( observer outputs.Observer, cfg *conf.C, ) (outputs.Group, error) { - config, err := readConfig(cfg, beat) + lsConfig, err := readConfig(cfg, beat) if err != nil { return outputs.Fail(err) } @@ -51,14 +51,14 @@ func makeLogstash( return outputs.Fail(err) } - tls, err := tlscommon.LoadTLSConfig(config.TLS) + tls, err := tlscommon.LoadTLSConfig(lsConfig.TLS) if err != nil { return outputs.Fail(err) } transp := transport.Config{ - Timeout: config.Timeout, - Proxy: &config.Proxy, + Timeout: lsConfig.Timeout, + Proxy: &lsConfig.Proxy, TLS: tls, Stats: observer, } @@ -72,18 +72,18 @@ func makeLogstash( return outputs.Fail(err) } - if config.Pipelining > 0 { - client, err = newAsyncClient(beat, conn, observer, config) + if lsConfig.Pipelining > 0 { + client, err = newAsyncClient(beat, conn, observer, lsConfig) } else { - client, err = newSyncClient(beat, conn, observer, config) + client, err = newSyncClient(beat, conn, observer, lsConfig) } if err != nil { return outputs.Fail(err) } - client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max) + client = outputs.WithBackoff(client, lsConfig.Backoff.Init, lsConfig.Backoff.Max) clients[i] = client } - return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients) + return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, clients) } diff --git a/libbeat/outputs/redis/config.go b/libbeat/outputs/redis/config.go index 01c8f2e0238..4785af137f1 100644 --- a/libbeat/outputs/redis/config.go +++ b/libbeat/outputs/redis/config.go @@ -22,6 +22,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/transport" "github.com/elastic/elastic-agent-libs/transport/tlscommon" ) @@ -40,6 +41,7 @@ type redisConfig struct { Db int `config:"db"` DataType string `config:"datatype"` Backoff backoff `config:"backoff"` + Queue config.Namespace `config:"queue"` } type backoff struct { diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 026cb04d4f8..9814d6abee7 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -34,10 +34,6 @@ import ( "github.com/elastic/elastic-agent-libs/transport/tlscommon" ) -type redisOut struct { - beat beat.Info -} - const ( defaultWaitRetry = 1 * time.Second defaultMaxWaitRetry = 60 * time.Second @@ -58,7 +54,9 @@ func makeRedis( ) (outputs.Group, error) { if !cfg.HasField("index") { - cfg.SetString("index", -1, beat.Beat) + if err := cfg.SetString("index", -1, beat.Beat); err != nil { + return outputs.Fail(err) + } } err := cfgwarn.CheckRemoved6xSettings(cfg, "port") @@ -77,13 +75,13 @@ func makeRedis( } } - config := defaultConfig - if err := cfg.Unpack(&config); err != nil { + rConfig := defaultConfig + if err := cfg.Unpack(&rConfig); err != nil { return outputs.Fail(err) } var dataType redisDataType - switch config.DataType { + switch rConfig.DataType { case "", "list": dataType = redisListType case "channel": @@ -102,7 +100,7 @@ func makeRedis( return outputs.Fail(err) } - tls, err := tlscommon.LoadTLSConfig(config.TLS) + tls, err := tlscommon.LoadTLSConfig(rConfig.TLS) if err != nil { return outputs.Fail(err) } @@ -129,8 +127,8 @@ func makeRedis( } transp := transport.Config{ - Timeout: config.Timeout, - Proxy: &config.Proxy, + Timeout: rConfig.Timeout, + Proxy: &rConfig.Proxy, TLS: tls, Stats: observer, } @@ -138,7 +136,7 @@ func makeRedis( switch hostUrl.Scheme { case redisScheme: if hasScheme { - transp.TLS = nil // disable TLS if user explicitely set `redis` scheme + transp.TLS = nil // disable TLS if user explicitly set `redis` scheme } case tlsRedisScheme: if transp.TLS == nil { @@ -151,23 +149,23 @@ func makeRedis( return outputs.Fail(err) } - pass := config.Password + pass := rConfig.Password hostPass, passSet := hostUrl.User.Password() if passSet { pass = hostPass } - enc, err := codec.CreateEncoder(beat, config.Codec) + enc, err := codec.CreateEncoder(beat, rConfig.Codec) if err != nil { return outputs.Fail(err) } - client := newClient(conn, observer, config.Timeout, - pass, config.Db, key, dataType, config.Index, enc) - clients[i] = newBackoffClient(client, config.Backoff.Init, config.Backoff.Max) + client := newClient(conn, observer, rConfig.Timeout, + pass, rConfig.Db, key, dataType, rConfig.Index, enc) + clients[i] = newBackoffClient(client, rConfig.Backoff.Init, rConfig.Backoff.Max) } - return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients) + return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, rConfig.MaxRetries, clients) } func buildKeySelector(cfg *config.C) (outil.Selector, error) { diff --git a/libbeat/outputs/util.go b/libbeat/outputs/util.go index 15068910f8c..ce8765b5c2e 100644 --- a/libbeat/outputs/util.go +++ b/libbeat/outputs/util.go @@ -17,16 +17,52 @@ package outputs +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + "github.com/elastic/elastic-agent-libs/config" +) + // Fail helper can be used by output factories, to create a failure response when // loading an output must return an error. func Fail(err error) (Group, error) { return Group{}, err } -// Success create a valid output Group response for a set of client instances. -func Success(batchSize, retry int, clients ...Client) (Group, error) { +// Success create a valid output Group response for a set of client +// instances. The first argument is expected to contain a queue +// config.Namespace. The queue config is passed to assign the queue +// factory when elastic-agent reloads the output. +func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Group, error) { + var q queue.QueueFactory + if cfg.IsSet() && cfg.Config().Enabled() { + switch cfg.Name() { + case memqueue.QueueType: + settings, err := memqueue.SettingsForUserConfig(cfg.Config()) + if err != nil { + return Group{}, fmt.Errorf("unable to get memory queue settings: %w", err) + } + q = memqueue.FactoryForSettings(settings) + case diskqueue.QueueType: + if publisher.UnderAgent() { + return Group{}, fmt.Errorf("disk queue not supported under agent") + } + settings, err := diskqueue.SettingsForUserConfig(cfg.Config()) + if err != nil { + return Group{}, fmt.Errorf("unable to get disk queue settings: %w", err) + } + q = diskqueue.FactoryForSettings(settings) + default: + return Group{}, fmt.Errorf("unknown queue type: %s", cfg.Name()) + } + } return Group{ - Clients: clients, - BatchSize: batchSize, - Retry: retry, + Clients: clients, + BatchSize: batchSize, + Retry: retry, + QueueFactory: q, }, nil } @@ -39,11 +75,16 @@ func NetworkClients(netclients []NetworkClient) []Client { return clients } -func SuccessNet(loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { +// SuccessNet create a valid output Group and creates client instances +// The first argument is expected to contain a queue config.Namespace. +// The queue config is passed to assign the queue factory when +// elastic-agent reloads the output. +func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { + if !loadbalance { - return Success(batchSize, retry, NewFailoverClient(netclients)) + return Success(cfg, batchSize, retry, NewFailoverClient(netclients)) } clients := NetworkClients(netclients) - return Success(batchSize, retry, clients...) + return Success(cfg, batchSize, retry, clients...) } diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 4a212092c7e..15260172ff5 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -144,7 +144,7 @@ func TestClientWaitClose(t *testing.T) { err := logp.TestingSetup() assert.Nil(t, err) - q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}) + q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0) pipeline := makePipeline(Settings{}, q) defer pipeline.Close() diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index bf080677ef4..1c480c01bce 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -62,6 +62,13 @@ type outputController struct { consumer *eventConsumer workers []outputWorker + // The InputQueueSize can be set when the Beat is started, in + // libbeat/cmd/instance/Settings we need to preserve that + // value and pass it into the queue factory. The queue + // factory could be made from elastic-agent output + // configuration reloading which doesn't have access to this + // setting. + inputQueueSize int } type producerRequest struct { @@ -81,6 +88,7 @@ func newOutputController( observer outputObserver, eventWaitGroup *sync.WaitGroup, queueFactory queue.QueueFactory, + inputQueueSize int, ) (*outputController, error) { controller := &outputController{ beat: beat, @@ -90,6 +98,7 @@ func newOutputController( queueFactory: queueFactory, workerChan: make(chan publisher.Batch), consumer: newEventConsumer(monitors.Logger, observer), + inputQueueSize: inputQueueSize, } return controller, nil @@ -258,11 +267,11 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { factory = c.queueFactory } - queue, err := factory(logger, c.onACK) + queue, err := factory(logger, c.onACK, c.inputQueueSize) if err != nil { logger.Errorf("queue creation failed, falling back to default memory queue, check your queue configuration") s, _ := memqueue.SettingsForUserConfig(nil) - queue = memqueue.NewQueue(logger, c.onACK, s) + queue = memqueue.NewQueue(logger, c.onACK, s, c.inputQueueSize) } c.queue = queue diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 366f4bff1d9..7384e5f7128 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -189,7 +189,7 @@ func TestOutputQueueFactoryTakesPrecedence(t *testing.T) { func TestFailedQueueFactoryRevertsToDefault(t *testing.T) { defaultSettings, _ := memqueue.SettingsForUserConfig(nil) - failedFactory := func(_ *logp.Logger, _ func(int)) (queue.Queue, error) { + failedFactory := func(_ *logp.Logger, _ func(int), _ int) (queue.Queue, error) { return nil, fmt.Errorf("This queue creation intentionally failed") } controller := outputController{ diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 209688bb5c2..cf03163750e 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -153,13 +153,12 @@ func New( if b := userQueueConfig.Name(); b != "" { queueType = b } - queueFactory, err := queueFactoryForUserConfig( - queueType, userQueueConfig.Config(), settings.InputQueueSize) + queueFactory, err := queueFactoryForUserConfig(queueType, userQueueConfig.Config()) if err != nil { return nil, err } - output, err := newOutputController(beat, monitors, p.observer, p.eventWaitGroup, queueFactory) + output, err := newOutputController(beat, monitors, p.observer, p.eventWaitGroup, queueFactory, settings.InputQueueSize) if err != nil { return nil, err } @@ -399,16 +398,13 @@ func (p *Pipeline) OutputReloader() OutputReloader { // This helper exists to frontload config parsing errors: if there is an // error in the queue config, we want it to show up as fatal during // initialization, even if the queue itself isn't created until later. -func queueFactoryForUserConfig(queueType string, userConfig *conf.C, inQueueSize int) (queue.QueueFactory, error) { +func queueFactoryForUserConfig(queueType string, userConfig *conf.C) (queue.QueueFactory, error) { switch queueType { case memqueue.QueueType: settings, err := memqueue.SettingsForUserConfig(userConfig) if err != nil { return nil, err } - // The memory queue has a special override during pipeline - // initialization for the size of its API channel buffer. - settings.InputQueueSize = inQueueSize return memqueue.FactoryForSettings(settings), nil case diskqueue.QueueType: settings, err := diskqueue.SettingsForUserConfig(userConfig) diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index 6aa510de1b0..d1014b8d782 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -35,11 +35,12 @@ type testOutput struct { } type testOutputConfig struct { - Worker int `config:"worker" validate:"min=1"` - BulkMaxSize int `config:"bulk_max_size"` - Retry int `config:"retry"` - MinWait time.Duration `config:"min_wait"` - MaxWait time.Duration `config:"max_wait"` + Worker int `config:"worker" validate:"min=1"` + BulkMaxSize int `config:"bulk_max_size"` + Retry int `config:"retry"` + MinWait time.Duration `config:"min_wait"` + MaxWait time.Duration `config:"max_wait"` + Queue conf.Namespace `config:"queue"` Fail struct { EveryBatch int } @@ -66,7 +67,7 @@ func makeTestOutput(_ outputs.IndexManager, beat beat.Info, observer outputs.Obs clients[i] = client } - return outputs.Success(config.BulkMaxSize, config.Retry, clients...) + return outputs.Success(config.Queue, config.BulkMaxSize, config.Retry, clients...) } func (*testOutput) Close() error { return nil } diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 2b754890882..74fff3fea64 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -109,6 +109,7 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, ackCallback func(eventCount int), + inputQueueSize int, ) (queue.Queue, error) { return NewQueue(logger, ackCallback, settings) } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 0bb3ff9ed8e..ac5b9dc6615 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -84,7 +84,6 @@ type Settings struct { Events int FlushMinEvents int FlushTimeout time.Duration - InputQueueSize int } type queueEntry struct { @@ -123,8 +122,9 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, ackCallback func(eventCount int), + inputQueueSize int, ) (queue.Queue, error) { - return NewQueue(logger, ackCallback, settings), nil + return NewQueue(logger, ackCallback, settings, inputQueueSize), nil } } @@ -135,6 +135,7 @@ func NewQueue( logger *logp.Logger, ackCallback func(eventCount int), settings Settings, + inputQueueSize int, ) *broker { var ( sz = settings.Events @@ -142,7 +143,7 @@ func NewQueue( flushTimeout = settings.FlushTimeout ) - chanSize := AdjustInputQueueSize(settings.InputQueueSize, sz) + chanSize := AdjustInputQueueSize(inputQueueSize, sz) if minEvents < 1 { minEvents = 1 diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index ef9ee52a944..28cc38025c3 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -103,7 +103,7 @@ func TestQueueMetricsBuffer(t *testing.T) { } func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) { - testQueue := NewQueue(nil, nil, settings) + testQueue := NewQueue(nil, nil, settings, 0) defer testQueue.Close() // Send events to queue @@ -147,7 +147,7 @@ func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.Queu Events: sz, FlushMinEvents: minEvents, FlushTimeout: flushTimeout, - }) + }, 0) } } @@ -258,22 +258,22 @@ func TestEntryIDs(t *testing.T) { } t.Run("acking in forward order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0) testForward(testQueue) }) t.Run("acking in reverse order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0) testBackward(testQueue) }) t.Run("acking in forward order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}, 0) testForward(testQueue) }) t.Run("acking in reverse order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, FlushMinEvents: 2, FlushTimeout: time.Microsecond}, 0) testBackward(testQueue) }) } diff --git a/libbeat/publisher/queue/proxy/broker.go b/libbeat/publisher/queue/proxy/broker.go index 20400e3ab75..832739cc26d 100644 --- a/libbeat/publisher/queue/proxy/broker.go +++ b/libbeat/publisher/queue/proxy/broker.go @@ -90,6 +90,7 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { return func( logger *logp.Logger, ackCallback func(eventCount int), + inputQueueSize int, ) (queue.Queue, error) { return NewQueue(logger, ackCallback, settings), nil } diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index d0e1c047610..101a3290117 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -74,7 +74,7 @@ type Queue interface { Metrics() (Metrics, error) } -type QueueFactory func(logger *logp.Logger, ack func(eventCount int)) (Queue, error) +type QueueFactory func(logger *logp.Logger, ack func(eventCount int), inputQueueSize int) (Queue, error) // BufferConfig returns the pipelines buffering settings, // for the pipeline to use.