diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index c3b10d03f79..7dbbb98bbef 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -1338,6 +1338,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is auditbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "auditbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index def62fbca7c..84bd161d88f 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -2434,6 +2434,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is filebeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "filebeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index fdbdcbe7042..ff2ac644a9d 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -1430,6 +1430,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is heartbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "heartbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/libbeat/_meta/config/default.reference.yml.tmpl b/libbeat/_meta/config/default.reference.yml.tmpl index 073340343d8..0b22112ed37 100644 --- a/libbeat/_meta/config/default.reference.yml.tmpl +++ b/libbeat/_meta/config/default.reference.yml.tmpl @@ -14,6 +14,7 @@ {{template "setup.dashboards.reference.yml.tmpl" .}} {{template "setup.template.reference.yml.tmpl" .}} {{template "setup.ilm.reference.yml.tmpl" .}} +{{template "setup.dsl.reference.yml.tmpl" .}} {{template "setup.kibana.reference.yml.tmpl" .}} {{template "logging.reference.yml.tmpl" .}} {{template "monitoring.reference.yml.tmpl" .}} diff --git a/libbeat/_meta/config/setup.dsl.reference.yml.tmpl b/libbeat/_meta/config/setup.dsl.reference.yml.tmpl new file mode 100644 index 00000000000..e66e83f17c2 --- /dev/null +++ b/libbeat/_meta/config/setup.dsl.reference.yml.tmpl @@ -0,0 +1,24 @@ +{{header "Data Stream Lifecycle (DSL)"}} + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is {{.BeatName}}-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "{{.BeatName}}-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false \ No newline at end of file diff --git a/libbeat/cmd/export/ilm_policy.go b/libbeat/cmd/export/ilm_policy.go index b2a8b845514..60c97920fd7 100644 --- a/libbeat/cmd/export/ilm_policy.go +++ b/libbeat/cmd/export/ilm_policy.go @@ -22,7 +22,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cmd/instance" "github.com/elastic/beats/v7/libbeat/idxmgmt" - "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" ) // GenGetILMPolicyCmd is the command used to export the ilm policy. @@ -35,14 +35,20 @@ func GenGetILMPolicyCmd(settings instance.Settings) *cobra.Command { dir, _ := cmd.Flags().GetString("dir") if settings.ILM == nil { - settings.ILM = ilm.StdSupport + settings.ILM = lifecycle.StdSupport } b, err := instance.NewInitializedBeat(settings) if err != nil { fatalfInitCmd(err) } - clientHandler := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version)) + // the way this works, we decide to export ILM or DSL based on the user's config. + // This means that if a user has no index management config, we'll default to ILM, regardless of what the user + // is connected to. Might not be a problem since a user who doesn't have any custom lifecycle config has nothing to export? + clientHandler, err := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version), b.Info, b.Config.LifecycleConfig) + if err != nil { + fatalf("error creating file handler: %s", err) + } idxManager := b.IdxSupporter.Manager(clientHandler, idxmgmt.BeatsAssets(b.Fields)) if err := idxManager.Setup(idxmgmt.LoadModeDisabled, idxmgmt.LoadModeForce); err != nil { fatalf("Error exporting ilm-policy: %+v.", err) diff --git a/libbeat/cmd/export/template.go b/libbeat/cmd/export/template.go index 23d572fad2a..ffd957961ef 100644 --- a/libbeat/cmd/export/template.go +++ b/libbeat/cmd/export/template.go @@ -22,7 +22,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cmd/instance" "github.com/elastic/beats/v7/libbeat/idxmgmt" - "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" ) // GenTemplateConfigCmd is the command used to export the elasticsearch template. @@ -36,10 +36,10 @@ func GenTemplateConfigCmd(settings instance.Settings) *cobra.Command { noILM, _ := cmd.Flags().GetBool("noilm") if noILM { - settings.ILM = ilm.NoopSupport + settings.ILM = lifecycle.NoopSupport } if settings.ILM == nil { - settings.ILM = ilm.StdSupport + settings.ILM = lifecycle.StdSupport } b, err := instance.NewInitializedBeat(settings) @@ -47,7 +47,10 @@ func GenTemplateConfigCmd(settings instance.Settings) *cobra.Command { fatalfInitCmd(err) } - clientHandler := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version)) + clientHandler, err := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version), b.Info, b.Config.LifecycleConfig) + if err != nil { + fatalf("error creating file handler: %s", err) + } idxManager := b.IdxSupporter.Manager(clientHandler, idxmgmt.BeatsAssets(b.Fields)) if err := idxManager.Setup(idxmgmt.LoadModeForce, idxmgmt.LoadModeDisabled); err != nil { fatalf("Error exporting template: %+v.", err) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 8bd493e1777..4e72996c966 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -54,6 +54,7 @@ import ( "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/features" "github.com/elastic/beats/v7/libbeat/idxmgmt" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/kibana" "github.com/elastic/beats/v7/libbeat/management" @@ -133,6 +134,9 @@ type beatConfig struct { // monitoring settings MonitoringBeatConfig monitoring.BeatConfig `config:",inline"` + // ILM settings + LifecycleConfig lifecycle.RawConfig `config:",inline"` + // central management settings Management *config.C `config:"management"` @@ -671,7 +675,13 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er if setup.IndexManagement || setup.ILMPolicy { loadILM = idxmgmt.LoadModeEnabled } - m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields)) + + mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Config.LifecycleConfig) + if err != nil { + return fmt.Errorf("error creating index management handler: %w", err) + } + + m := b.IdxSupporter.Manager(mgmtHandler, idxmgmt.BeatsAssets(b.Fields)) if ok, warn := m.VerifySetup(loadTemplate, loadILM); !ok { fmt.Println(warn) } @@ -1065,7 +1075,11 @@ func (b *Beat) registerESIndexManagement() error { func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback { return func(esClient *eslegclient.Connection) error { - m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields)) + mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Config.LifecycleConfig) + if err != nil { + return fmt.Errorf("error creating index management handler: %w", err) + } + m := b.IdxSupporter.Manager(mgmtHandler, idxmgmt.BeatsAssets(b.Fields)) return m.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeEnabled) } } diff --git a/libbeat/cmd/instance/settings.go b/libbeat/cmd/instance/settings.go index b46f248917c..5cf6b4eca19 100644 --- a/libbeat/cmd/instance/settings.go +++ b/libbeat/cmd/instance/settings.go @@ -22,7 +22,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/idxmgmt" - "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/monitoring/report" "github.com/elastic/beats/v7/libbeat/publisher/processing" ) @@ -42,7 +42,7 @@ type Settings struct { // load custom index manager. The config object will be the Beats root configuration. IndexManagement idxmgmt.SupportFactory - ILM ilm.SupportFactory + ILM lifecycle.SupportFactory Processing processing.SupportFactory diff --git a/libbeat/common/fmtstr/formatevents.go b/libbeat/common/fmtstr/formatevents.go index 68477073da2..52ae7b65057 100644 --- a/libbeat/common/fmtstr/formatevents.go +++ b/libbeat/common/fmtstr/formatevents.go @@ -84,7 +84,6 @@ type eventEvalContext struct { } var ( - errMissingKeys = errors.New("missing keys") errConvertString = errors.New("can not convert to string") ) @@ -157,12 +156,12 @@ func CompileEvent(in string) (*EventFormatString, error) { func (fs *EventFormatString) Unpack(v interface{}) error { s, err := tryConvString(v) if err != nil { - return err + return fmt.Errorf("error converting type %T to event formatter: %w", v, err) } tmp, err := CompileEvent(s) if err != nil { - return err + return fmt.Errorf("error compiling event formatter: %w", err) } // init fs from tmp @@ -170,6 +169,11 @@ func (fs *EventFormatString) Unpack(v interface{}) error { return nil } +// IsInitialized returns true if the underlying event formatter is prepared to format an event +func (fs *EventFormatString) IsInitialized() bool { + return fs.formatter != nil +} + // NumFields returns number of unique event fields used by the format string. func (fs *EventFormatString) NumFields() int { return len(fs.fields) @@ -190,6 +194,9 @@ func (fs *EventFormatString) Fields() []string { // Run executes the format string returning a new expanded string or an error // if execution or event field expansion fails. func (fs *EventFormatString) Run(event *beat.Event) (string, error) { + if !fs.IsInitialized() { + return "", fmt.Errorf("event formatter is nil") + } ctx := newEventCtx(len(fs.fields)) defer releaseCtx(ctx) @@ -296,7 +303,7 @@ func (e *eventFieldCompiler) compileEventField( ops []VariableOp, ) (FormatEvaler, error) { if len(ops) > 1 { - return nil, errors.New("Too many format modifiers given") + return nil, errors.New("too many format modifiers given") } defaultValue := "" @@ -340,12 +347,12 @@ func (e *eventFieldCompiler) compileTimestamp( ops []VariableOp, ) (FormatEvaler, error) { if expression[0] != '+' { - return nil, errors.New("No timestamp expression") + return nil, errors.New("no timestamp expression") } formatter, err := dtfmt.NewFormatter(expression[1:]) if err != nil { - return nil, fmt.Errorf("%v in timestamp expression", err) + return nil, fmt.Errorf("%w in timestamp expression", err) } e.timestamp = true @@ -353,10 +360,6 @@ func (e *eventFieldCompiler) compileTimestamp( } func (e *eventFieldEvaler) Eval(c interface{}, out *bytes.Buffer) error { - type stringer interface { - String() string - } - ctx := c.(*eventEvalContext) s := ctx.keys[e.index] _, err := out.WriteString(s) @@ -364,10 +367,6 @@ func (e *eventFieldEvaler) Eval(c interface{}, out *bytes.Buffer) error { } func (e *defaultEventFieldEvaler) Eval(c interface{}, out *bytes.Buffer) error { - type stringer interface { - String() string - } - ctx := c.(*eventEvalContext) s := ctx.keys[e.index] if s == "" { @@ -385,7 +384,7 @@ func (e *eventTimestampEvaler) Eval(c interface{}, out *bytes.Buffer) error { func parseEventPath(field string) (string, error) { field = strings.Trim(field, " \n\r\t") - var fields []string + fields := []string{} for len(field) > 0 { if field[0] != '[' { diff --git a/libbeat/dashboards/kibana_loader.go b/libbeat/dashboards/kibana_loader.go index 5e67c7576bd..55d195c4f8e 100644 --- a/libbeat/dashboards/kibana_loader.go +++ b/libbeat/dashboards/kibana_loader.go @@ -61,7 +61,7 @@ type KibanaLoader struct { // NewKibanaLoader creates a new loader to load Kibana files func NewKibanaLoader(ctx context.Context, cfg *config.C, dashboardsConfig *Config, hostname string, msgOutputter MessageOutputter, beatname string) (*KibanaLoader, error) { if cfg == nil || !cfg.Enabled() { - return nil, fmt.Errorf("Kibana is not configured or enabled") + return nil, fmt.Errorf("kibana is not configured or enabled") } client, err := getKibanaClient(ctx, cfg, dashboardsConfig.Retry, 0, beatname) @@ -127,7 +127,7 @@ func (loader KibanaLoader) ImportIndexFile(file string) error { // ImportIndex imports the passed index pattern to Kibana func (loader KibanaLoader) ImportIndex(pattern mapstr.M) error { if loader.version.LessThan(minimumRequiredVersionSavedObjects) { - return fmt.Errorf("Kibana version must be at least " + minimumRequiredVersionSavedObjects.String()) + return fmt.Errorf("kibana version must be at least " + minimumRequiredVersionSavedObjects.String()) } var errs multierror.Errors diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 9736af191fd..322d0f08dbf 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -423,6 +423,7 @@ func (conn *Connection) getVersion() error { } if versionData.Version.BuildFlavor == "serverless" { + conn.log.Info("build flavor of es is severless, marking connection as serverless") conn.isServerless = true } else if versionData.Version.BuildFlavor == "default" { conn.isServerless = false diff --git a/libbeat/idxmgmt/client_handler.go b/libbeat/idxmgmt/client_handler.go index 9347cdb27cd..ae92f75a36c 100644 --- a/libbeat/idxmgmt/client_handler.go +++ b/libbeat/idxmgmt/client_handler.go @@ -18,19 +18,22 @@ package idxmgmt import ( - "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" + "fmt" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/template" "github.com/elastic/elastic-agent-libs/version" ) // ClientHandler defines the interface between a remote service and the Manager for ILM and templates. type ClientHandler interface { - ilm.ClientHandler + lifecycle.ClientHandler template.Loader } type clientHandler struct { - ilm.ClientHandler + lifecycle.ClientHandler template.Loader } @@ -50,18 +53,30 @@ type FileClient interface { } // NewClientHandler initializes and returns a new instance of ClientHandler -func NewClientHandler(ilm ilm.ClientHandler, template template.Loader) ClientHandler { +func NewClientHandler(ilm lifecycle.ClientHandler, template template.Loader) ClientHandler { return &clientHandler{ilm, template} } // NewESClientHandler returns a new ESLoader instance, // initialized with an ilm and template client handler based on the passed in client. -func NewESClientHandler(c ESClient) ClientHandler { - return NewClientHandler(ilm.NewESClientHandler(c), template.NewESLoader(c)) +func NewESClientHandler(client ESClient, info beat.Info, cfg lifecycle.RawConfig) (ClientHandler, error) { + esHandler, err := lifecycle.NewESClientHandler(client, info, cfg) + if err != nil { + return nil, fmt.Errorf("error creating ES handler: %w", err) + } + loader, err := template.NewESLoader(client, esHandler) + if err != nil { + return nil, fmt.Errorf("error creating ES loader: %w", err) + } + return NewClientHandler(esHandler, loader), nil } // NewFileClientHandler returns a new ESLoader instance, // initialized with an ilm and template client handler based on the passed in client. -func NewFileClientHandler(c FileClient) ClientHandler { - return NewClientHandler(ilm.NewFileClientHandler(c), template.NewFileLoader(c)) +func NewFileClientHandler(client FileClient, info beat.Info, cfg lifecycle.RawConfig) (ClientHandler, error) { + mgmt, err := lifecycle.NewFileClientHandler(client, info, cfg) + if err != nil { + return nil, fmt.Errorf("error creating client handler: %w", err) + } + return NewClientHandler(mgmt, template.NewFileLoader(client, mgmt.Mode() == lifecycle.DSL)), nil } diff --git a/libbeat/idxmgmt/idxmgmt.go b/libbeat/idxmgmt/idxmgmt.go index 42972a7bc88..4cd3fbe93e3 100644 --- a/libbeat/idxmgmt/idxmgmt.go +++ b/libbeat/idxmgmt/idxmgmt.go @@ -22,7 +22,7 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/template" "github.com/elastic/elastic-agent-libs/config" @@ -99,37 +99,47 @@ func DefaultSupport(log *logp.Logger, info beat.Info, configRoot *config.C) (Sup // MakeDefaultSupport creates some default index management support, with a // custom ILM support implementation. -func MakeDefaultSupport(ilmSupport ilm.SupportFactory) SupportFactory { +func MakeDefaultSupport(ilmSupport lifecycle.SupportFactory) SupportFactory { if ilmSupport == nil { - ilmSupport = ilm.DefaultSupport + ilmSupport = lifecycle.DefaultSupport } return func(log *logp.Logger, info beat.Info, configRoot *config.C) (Supporter, error) { const logName = "index-management" + if log == nil { + log = logp.NewLogger(logName) + } else { + log = log.Named(logName) + } + // now that we have the "correct" default, unpack the rest of the config cfg := struct { - ILM *config.C `config:"setup.ilm"` - Template *config.C `config:"setup.template"` - Output config.Namespace `config:"output"` - Migration *config.C `config:"migration.6_to_7"` + Lifecycle lifecycle.RawConfig `config:",inline"` + Template *config.C `config:"setup.template"` + Output config.Namespace `config:"output"` + Migration *config.C `config:"migration.6_to_7"` }{} if configRoot != nil { if err := configRoot.Unpack(&cfg); err != nil { - return nil, err + return nil, fmt.Errorf("error unpacking cfg settings while setting up index support: %w", err) } } - if log == nil { - log = logp.NewLogger(logName) - } else { - log = log.Named(logName) + // consider lifecycles enabled if the user has explicitly enabled them, + // or if no `enabled` setting has been set by the user, thus reverting to a default of enabled. + enabled := false + if cfg.Lifecycle.DSL.Enabled() || cfg.Lifecycle.ILM.Enabled() { + enabled = true + } + if (cfg.Lifecycle.DSL == nil || !cfg.Lifecycle.DSL.HasField("enabled")) && (cfg.Lifecycle.ILM == nil || !cfg.Lifecycle.ILM.HasField("enabled")) { + enabled = true } if err := checkTemplateESSettings(cfg.Template, cfg.Output); err != nil { return nil, err } - return newIndexSupport(log, info, ilmSupport, cfg.Template, cfg.ILM, cfg.Migration.Enabled()) + return newIndexSupport(log, info, ilmSupport, cfg.Template, enabled, cfg.Migration.Enabled()) } } @@ -156,7 +166,7 @@ func checkTemplateESSettings(tmpl *config.C, out config.Namespace) error { var tmplCfg template.TemplateConfig if tmpl != nil { if err := tmpl.Unpack(&tmplCfg); err != nil { - return fmt.Errorf("unpacking template config fails: %v", err) + return fmt.Errorf("unpacking template config fails: %w", err) } } diff --git a/libbeat/idxmgmt/ilm/client_handler.go b/libbeat/idxmgmt/ilm/client_handler.go deleted file mode 100644 index 1f7d6f278e3..00000000000 --- a/libbeat/idxmgmt/ilm/client_handler.go +++ /dev/null @@ -1,146 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package ilm - -import ( - "fmt" - "path" - - "github.com/elastic/elastic-agent-libs/version" -) - -// ClientHandler defines the interface between a remote service and the Manager. -type ClientHandler interface { - CheckILMEnabled(bool) (bool, error) - HasILMPolicy(name string) (bool, error) - CreateILMPolicy(policy Policy) error -} - -// ESClientHandler implements the Loader interface for talking to ES. -type ESClientHandler struct { - client ESClient -} - -type VersionCheckerClient interface { - GetVersion() version.V -} - -// ESClient defines the minimal interface required for the Loader to -// prepare a policy. -type ESClient interface { - VersionCheckerClient - IsServerless() bool - Request( - method, path string, - pipeline string, - params map[string]string, - body interface{}, - ) (int, []byte, error) -} - -// FileClientHandler implements the Loader interface for writing to a file. -type FileClientHandler struct { - client FileClient -} - -// FileClient defines the minimal interface required for the Loader to -// prepare a policy. -type FileClient interface { - GetVersion() version.V - Write(component string, name string, body string) error -} - -const ( - esILMPath = "/_ilm/policy" -) - -var ( - esMinDefaultILMVersion = version.MustNew("7.0.0") -) - -// NewESClientHandler initializes and returns an ESClientHandler, -func NewESClientHandler(c ESClient) *ESClientHandler { - return &ESClientHandler{client: c} -} - -// NewFileClientHandler initializes and returns a new FileClientHandler instance. -func NewFileClientHandler(c FileClient) *FileClientHandler { - return &FileClientHandler{client: c} -} - -// CheckILMEnabled indicates whether or not ILM is supported for the configured mode and ES instance. -func (h *ESClientHandler) CheckILMEnabled(enabled bool) (bool, error) { - return checkILMEnabled(enabled, h.client) -} - -// CreateILMPolicy loads the given policy to Elasticsearch. -func (h *ESClientHandler) CreateILMPolicy(policy Policy) error { - path := path.Join(esILMPath, policy.Name) - _, _, err := h.client.Request("PUT", path, "", nil, policy.Body) - return err -} - -// HasILMPolicy queries Elasticsearch to see if policy with given name exists. -func (h *ESClientHandler) HasILMPolicy(name string) (bool, error) { - // XXX: HEAD method does currently not work for checking if a policy exists - path := path.Join(esILMPath, name) - status, b, err := h.client.Request("GET", path, "", nil, nil) - if err != nil && status != 404 { - return false, wrapErrf(err, ErrRequestFailed, - "failed to check for policy name '%v': (status=%v) %s", name, status, b) - } - return status == 200, nil -} - -// CheckILMEnabled indicates whether or not ILM is supported for the configured mode and client version. -// If the connected ES instance is serverless, this will return false -func (h *FileClientHandler) CheckILMEnabled(enabled bool) (bool, error) { - return checkILMEnabled(enabled, h.client) -} - -func checkILMEnabled(enabled bool, c VersionCheckerClient) (bool, error) { - if !enabled { - return false, nil - } - - if esClient, ok := c.(ESClient); ok { - if esClient.IsServerless() { - return false, nil - } - } - - ver := c.GetVersion() - if ver.LessThan(esMinDefaultILMVersion) { - return false, errf(ErrESVersionNotSupported, "Elasticsearch %v does not support ILM", ver.String()) - } - return true, nil -} - -// CreateILMPolicy writes given policy to the configured file. -func (h *FileClientHandler) CreateILMPolicy(policy Policy) error { - str := fmt.Sprintf("%s\n", policy.Body.StringToPrint()) - if err := h.client.Write("policy", policy.Name, str); err != nil { - return fmt.Errorf("error printing policy : %w", err) - } - return nil -} - -// HasILMPolicy always returns false. -func (h *FileClientHandler) HasILMPolicy(name string) (bool, error) { - return false, nil -} diff --git a/libbeat/idxmgmt/ilm/config.go b/libbeat/idxmgmt/ilm/config.go deleted file mode 100644 index c076c0565e9..00000000000 --- a/libbeat/idxmgmt/ilm/config.go +++ /dev/null @@ -1,74 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package ilm - -import ( - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/fmtstr" - "github.com/elastic/elastic-agent-libs/mapstr" -) - -// Config is used for unpacking a config.C. -type Config struct { - Enabled bool `config:"enabled"` - PolicyName fmtstr.EventFormatString `config:"policy_name"` - PolicyFile string `config:"policy_file"` - - // CheckExists can disable the check for an existing policy. Check required - // read_ilm privileges. If check is disabled the policy will only be - // installed if Overwrite is enabled. - CheckExists bool `config:"check_exists"` - - // Enable always overwrite policy mode. This required manage_ilm privileges. - Overwrite bool `config:"overwrite"` -} - -// DefaultPolicy defines the default policy to be used if no custom policy is -// configured. -// By default the policy contains not warm, cold, or delete phase. -// The index is configured to rollover every 50GB or after 30d. -var DefaultPolicy = mapstr.M{ - "policy": mapstr.M{ - "phases": mapstr.M{ - "hot": mapstr.M{ - "actions": mapstr.M{ - "rollover": mapstr.M{ - "max_primary_shard_size": "50gb", - "max_age": "30d", - }, - }, - }, - }, - }, -} - -// Validate verifies that expected config options are given and valid -func (cfg *Config) Validate() error { - return nil -} - -func defaultConfig(info beat.Info) Config { - policyFmt := fmtstr.MustCompileEvent(info.Beat) - - return Config{ - Enabled: true, - PolicyName: *policyFmt, - PolicyFile: "", - CheckExists: true, - } -} diff --git a/libbeat/idxmgmt/ilm/error.go b/libbeat/idxmgmt/ilm/error.go deleted file mode 100644 index b2e9830afd7..00000000000 --- a/libbeat/idxmgmt/ilm/error.go +++ /dev/null @@ -1,92 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package ilm - -import ( - "errors" - "fmt" -) - -// Error indicates an error + reason describing the last error. -// The Reason() method returns a sentinal error value for comparison. -type Error struct { - reason error - cause error - message string -} - -var ( - ErrESVersionNotSupported = errors.New("ILM is not supported by the Elasticsearch version in use") - ErrILMCheckRequestFailed = errors.New("request checking for ILM availability failed") - ErrInvalidResponse = errors.New("invalid response received") - ErrESILMDisabled = errors.New("ILM is disabled in Elasticsearch") - ErrRequestFailed = errors.New("request failed") - ErrOpNotAvailable = errors.New("operation not available") -) - -func errOf(reason error) error { - return &Error{reason: reason} -} - -func errf(reason error, msg string, vs ...interface{}) error { - return wrapErrf(nil, reason, msg, vs...) -} - -func wrapErr(cause, reason error) error { - return wrapErrf(cause, reason, "") -} - -func wrapErrf(cause, reason error, msg string, vs ...interface{}) error { - return &Error{ - cause: cause, - reason: reason, - message: fmt.Sprintf(msg, vs...), - } -} - -// ErrReason calls Reason() if the error implements this method. Otherwise return nil. -func ErrReason(err error) error { - if err == nil { - return nil - } - - ifc, ok := err.(interface{ Reason() error }) - if !ok { - return nil - } - return ifc.Reason() -} - -// Cause returns the errors cause, if present. -func (e *Error) Cause() error { return e.cause } - -// Reason returns a sentinal error value define within the ilm package. -func (e *Error) Reason() error { return e.reason } - -// Error returns the formatted error string. -func (e *Error) Error() string { - msg := e.message - if e.message == "" { - msg = e.reason.Error() - } - - if e.cause != nil { - return fmt.Sprintf("%v: %+v", msg, e.cause) - } - return msg -} diff --git a/libbeat/idxmgmt/ilm/ilm_test.go b/libbeat/idxmgmt/ilm/ilm_test.go deleted file mode 100644 index bb689cab943..00000000000 --- a/libbeat/idxmgmt/ilm/ilm_test.go +++ /dev/null @@ -1,233 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package ilm - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/mapstr" -) - -func TestDefaultSupport_Init(t *testing.T) { - info := beat.Info{Beat: "test", Version: "9.9.9"} - - t.Run("with custom config", func(t *testing.T) { - tmp, err := DefaultSupport(nil, info, config.MustNewConfigFrom( - map[string]interface{}{ - "enabled": true, - "name": "test-%{[agent.version]}", - "check_exists": false, - "overwrite": true, - }, - )) - require.NoError(t, err) - - s := tmp.(*stdSupport) - assert := assert.New(t) - assert.Equal(true, s.overwrite) - assert.Equal(false, s.checkExists) - assert.Equal(true, s.Enabled()) - assert.Equal(DefaultPolicy, mapstr.M(s.Policy().Body)) - }) - - t.Run("with custom alias config with fieldref", func(t *testing.T) { - tmp, err := DefaultSupport(nil, info, config.MustNewConfigFrom( - map[string]interface{}{ - "enabled": true, - "check_exists": false, - "overwrite": true, - }, - )) - require.NoError(t, err) - - s := tmp.(*stdSupport) - assert := assert.New(t) - assert.Equal(true, s.overwrite) - assert.Equal(false, s.checkExists) - assert.Equal(true, s.Enabled()) - assert.Equal(DefaultPolicy, mapstr.M(s.Policy().Body)) - }) - - t.Run("with default alias", func(t *testing.T) { - tmp, err := DefaultSupport(nil, info, config.MustNewConfigFrom( - map[string]interface{}{ - "enabled": true, - "pattern": "01", - "check_exists": false, - "overwrite": true, - }, - )) - require.NoError(t, err) - - s := tmp.(*stdSupport) - assert := assert.New(t) - assert.Equal(true, s.overwrite) - assert.Equal(false, s.checkExists) - assert.Equal(true, s.Enabled()) - assert.Equal(DefaultPolicy, mapstr.M(s.Policy().Body)) - }) - - t.Run("load external policy", func(t *testing.T) { - s, err := DefaultSupport(nil, info, config.MustNewConfigFrom( - mapstr.M{"policy_file": "testfiles/custom.json"}, - )) - require.NoError(t, err) - assert.Equal(t, mapstr.M{"hello": "world"}, s.Policy().Body) - }) -} - -func TestDefaultSupport_Manager_Enabled(t *testing.T) { - cases := map[string]struct { - calls []onCall - cfg map[string]interface{} - enabled bool - fail error - err bool - }{ - "disabled via config": { - cfg: map[string]interface{}{"enabled": false}, - }, - "disabled via handler": { - calls: []onCall{ - onCheckILMEnabled(true).Return(false, ErrESILMDisabled), - }, - err: true, - }, - "enabled via handler": { - calls: []onCall{ - onCheckILMEnabled(true).Return(true, nil), - }, - enabled: true, - }, - "handler confirms enabled flag": { - calls: []onCall{ - onCheckILMEnabled(true).Return(true, nil), - }, - cfg: map[string]interface{}{"enabled": true}, - enabled: true, - }, - "io error": { - calls: []onCall{ - onCheckILMEnabled(true).Return(false, errors.New("ups")), - }, - cfg: map[string]interface{}{}, - err: true, - }, - } - - for name, test := range cases { - t.Run(name, func(t *testing.T) { - cfg := test.cfg - if cfg == nil { - cfg = map[string]interface{}{} - } - - h := newMockHandler(test.calls...) - m := createManager(t, h, test.cfg) - enabled, err := m.CheckEnabled() - - if test.fail == nil && !test.err { - require.NoError(t, err) - } - if test.err || test.fail != nil { - require.Error(t, err) - } - if test.fail != nil { - assert.Equal(t, test.fail, ErrReason(err)) - } - - assert.Equal(t, test.enabled, enabled) - h.AssertExpectations(t) - }) - } -} - -func TestDefaultSupport_Manager_EnsurePolicy(t *testing.T) { - testPolicy := Policy{ - Name: "test", - Body: DefaultPolicy, - } - - cases := map[string]struct { - calls []onCall - overwrite bool - cfg map[string]interface{} - create bool - fail error - }{ - "create new policy": { - create: true, - calls: []onCall{ - onHasILMPolicy(testPolicy.Name).Return(false, nil), - onCreateILMPolicy(testPolicy).Return(nil), - }, - }, - "policy already exists": { - create: false, - calls: []onCall{ - onHasILMPolicy(testPolicy.Name).Return(true, nil), - }, - }, - "overwrite": { - overwrite: true, - create: true, - calls: []onCall{ - onCreateILMPolicy(testPolicy).Return(nil), - }, - }, - "fail": { - calls: []onCall{ - onHasILMPolicy(testPolicy.Name).Return(false, nil), - onCreateILMPolicy(testPolicy).Return(errOf(ErrRequestFailed)), - }, - fail: ErrRequestFailed, - }, - } - - for name, test := range cases { - test := test - t.Run(name, func(t *testing.T) { - h := newMockHandler(test.calls...) - m := createManager(t, h, test.cfg) - created, err := m.EnsurePolicy(test.overwrite) - - if test.fail == nil { - assert.Equal(t, test.create, created) - require.NoError(t, err) - } else { - require.Error(t, err) - assert.Equal(t, test.fail, ErrReason(err)) - } - - h.AssertExpectations(t) - }) - } -} - -func createManager(t *testing.T, h ClientHandler, cfg map[string]interface{}) Manager { - info := beat.Info{Beat: "test", Version: "9.9.9"} - s, err := DefaultSupport(nil, info, config.MustNewConfigFrom(cfg)) - require.NoError(t, err) - return s.Manager(h) -} diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/index_support.go similarity index 78% rename from libbeat/idxmgmt/std.go rename to libbeat/idxmgmt/index_support.go index f8169dcdf89..94fd3b07db0 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/index_support.go @@ -25,7 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/beats/v7/libbeat/template" @@ -35,7 +35,7 @@ import ( type indexSupport struct { log *logp.Logger - ilm ilm.Supporter + ilm lifecycle.Supporter info beat.Info migration bool templateCfg template.TemplateConfig @@ -50,7 +50,7 @@ type indexState struct { type indexManager struct { support *indexSupport - ilm ilm.Manager + ilm lifecycle.Manager clientHandler ClientHandler assets Asseter @@ -95,23 +95,23 @@ func newFeature(c componentType, enabled, overwrite bool, mode LoadMode) feature func newIndexSupport( log *logp.Logger, info beat.Info, - ilmFactory ilm.SupportFactory, + ilmFactory lifecycle.SupportFactory, tmplConfig *config.C, - ilmConfig *config.C, + lifecyclesEnabled bool, migration bool, ) (*indexSupport, error) { if ilmFactory == nil { - ilmFactory = ilm.DefaultSupport + ilmFactory = lifecycle.DefaultSupport } - ilmSupporter, err := ilmFactory(log, info, ilmConfig) + ilmSupporter, err := ilmFactory(log, info, lifecyclesEnabled) if err != nil { - return nil, err + return nil, fmt.Errorf("error creating lifecycle supporter: %w", err) } tmplCfg, err := unpackTemplateConfig(info, tmplConfig) if err != nil { - return nil, err + return nil, fmt.Errorf("error unpacking template config: %w", err) } return &indexSupport{ @@ -207,14 +207,14 @@ func (s *indexSupport) BuildSelector(cfg *config.C) (outputs.IndexSelector, erro } // VerifySetup verifies the given feature setup, will return an error string if it detects something suspect -func (m *indexManager) VerifySetup(loadTemplate, loadILM LoadMode) (bool, string) { - ilmComponent := newFeature(componentILM, m.support.enabled(componentILM), m.support.ilm.Overwrite(), loadILM) +func (m *indexManager) VerifySetup(loadTemplate, loadLifecycle LoadMode) (bool, string) { + ilmComponent := newFeature(componentILM, m.support.enabled(componentILM), m.clientHandler.Overwrite(), loadLifecycle) templateComponent := newFeature(componentTemplate, m.support.enabled(componentTemplate), m.support.templateCfg.Overwrite, loadTemplate) if ilmComponent.load && !templateComponent.load { - return false, "Loading ILM policy without loading template is not recommended. Check your configuration." + return false, "Loading lifecycle policy without loading template is not recommended. Check your configuration." } if templateComponent.load && !ilmComponent.load && ilmComponent.enabled { @@ -225,13 +225,15 @@ func (m *indexManager) VerifySetup(loadTemplate, loadILM LoadMode) (bool, string var warn string if !ilmComponent.load { - warn += "ILM policy loading not enabled.\n" + warn += "lifecycle policy loading not enabled.\n" } else if !ilmComponent.overwrite { - warn += "Overwriting ILM policy is disabled. Set `setup.ilm.overwrite: true` for enabling.\n" + warn += "Overwriting lifecycle policy is disabled. Set `setup.ilm.overwrite: true` or `setup.dsl.overwrite: true` to overwrite.\n" } if !templateComponent.load { warn += "Template loading not enabled.\n" } + // remove last newline so we don't get weird formatting when this is printed to the console + warn = strings.TrimSuffix(warn, "\n") return warn == "", warn } @@ -244,15 +246,23 @@ func (m *indexManager) Setup(loadTemplate, loadILM LoadMode) error { return err } if withILM { - log.Info("Auto ILM enable success.") + log.Info("Auto lifecycle enable success.") } // create feature objects for ILM and template setup - ilmComponent := newFeature(componentILM, withILM, m.support.ilm.Overwrite(), loadILM) + ilmComponent := newFeature(componentILM, withILM, m.clientHandler.Overwrite(), loadILM) templateComponent := newFeature(componentTemplate, m.support.enabled(componentTemplate), m.support.templateCfg.Overwrite, loadTemplate) - if ilmComponent.load { + if m.clientHandler.Mode() == lifecycle.DSL { + log.Info("setting up DSL") + } + + // on DSL, the template load will create the lifecycle policy + // this is because the DSL API directly references the datastream, + // so the datastream must be created first under DSL + // If we're writing to a file, it doesn't matter + if ilmComponent.load && (m.clientHandler.Mode() == lifecycle.ILM || !m.clientHandler.IsElasticsearch()) { // install ilm policy policyCreated, err := m.ilm.EnsurePolicy(ilmComponent.overwrite) if err != nil { @@ -270,7 +280,7 @@ func (m *indexManager) Setup(loadTemplate, loadILM LoadMode) error { tmplCfg.Overwrite, tmplCfg.Enabled = templateComponent.overwrite, templateComponent.enabled if ilmComponent.enabled { - tmplCfg, err = applyILMSettingsToTemplate(log, tmplCfg, m.support.ilm.Policy()) + tmplCfg, err = applyLifecycleSettingsToTemplate(log, tmplCfg, m.clientHandler) if err != nil { return fmt.Errorf("error applying ILM settings: %w", err) } @@ -342,17 +352,17 @@ func unpackTemplateConfig(info beat.Info, cfg *config.C) (config template.Templa } // applies the specified ILM policy to the provided template, returns a struct of the template config -func applyILMSettingsToTemplate( +func applyLifecycleSettingsToTemplate( log *logp.Logger, tmpl template.TemplateConfig, - policy ilm.Policy, + policymgr lifecycle.ClientHandler, ) (template.TemplateConfig, error) { if !tmpl.Enabled { return tmpl, nil } - if policy.Name == "" { - return tmpl, errors.New("no ilm policy name configured") + if policymgr.PolicyName() == "" { + return tmpl, errors.New("no policy name configured") } // init/copy index settings @@ -368,23 +378,28 @@ func applyILMSettingsToTemplate( } tmpl.Settings.Index = idxSettings - // init/copy index.lifecycle settings - var lifecycle map[string]interface{} - if ifcLifecycle := idxSettings["lifecycle"]; ifcLifecycle == nil { - lifecycle = map[string]interface{}{} - } else if tmp, ok := ifcLifecycle.(map[string]interface{}); ok { - lifecycle = make(map[string]interface{}, len(tmp)) - for k, v := range tmp { - lifecycle[k] = v + if policymgr.Mode() == lifecycle.ILM { + // init/copy index.lifecycle settings + var lifecycle map[string]interface{} + if ifcLifecycle := idxSettings["lifecycle"]; ifcLifecycle == nil { + lifecycle = map[string]interface{}{} + } else if tmp, ok := ifcLifecycle.(map[string]interface{}); ok { + lifecycle = make(map[string]interface{}, len(tmp)) + for k, v := range tmp { + lifecycle[k] = v + } + } else { + return tmpl, errors.New("settings.index.lifecycle must be an object") } - } else { - return tmpl, errors.New("settings.index.lifecycle must be an object") - } - idxSettings["lifecycle"] = lifecycle + idxSettings["lifecycle"] = lifecycle - if _, exists := lifecycle["name"]; !exists { - log.Infof("Set settings.index.lifecycle.name in template to %s as ILM is enabled.", policy) - lifecycle["name"] = policy.Name + if _, exists := lifecycle["name"]; !exists { + log.Infof("Set settings.index.lifecycle.name in template to %s as ILM is enabled.", policymgr.PolicyName()) + lifecycle["name"] = policymgr.PolicyName() + } + } else { + // when we're in DSL mode, this is what actually creates the policy + tmpl.Settings.Lifecycle = policymgr.Policy().Body } return tmpl, nil diff --git a/libbeat/idxmgmt/lifecycle/client_handler.go b/libbeat/idxmgmt/lifecycle/client_handler.go new file mode 100644 index 00000000000..12d860cef8b --- /dev/null +++ b/libbeat/idxmgmt/lifecycle/client_handler.go @@ -0,0 +1,114 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "encoding/json" + "errors" + "fmt" + "os" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/version" +) + +// ClientHandler defines the interface between a remote service and the index Manager. +type ClientHandler interface { + CheckEnabled() (bool, error) + HasPolicy() (bool, error) + CreatePolicyFromConfig() error + PolicyName() string + Overwrite() bool + CheckExists() bool + Policy() Policy + Mode() Mode + IsElasticsearch() bool +} + +type VersionCheckerClient interface { + GetVersion() version.V +} + +// ESClient defines the minimal interface required for the Loader to +// prepare a policy. +type ESClient interface { + VersionCheckerClient + IsServerless() bool + Request( + method, path string, + pipeline string, + params map[string]string, + body interface{}, + ) (int, []byte, error) +} + +// FileClient defines the minimal interface required for the Loader to +// prepare a policy. +type FileClient interface { + GetVersion() version.V + Write(component string, name string, body string) error +} + +const ( + esILMPath = "/_ilm/policy" +) + +var ( + esMinDefaultILMVersion = version.MustNew("7.0.0") +) + +/// ============ generic helpers + +func checkILMEnabled(enabled bool, c VersionCheckerClient) (bool, error) { + if !enabled { + return false, nil + } + + ver := c.GetVersion() + if ver.LessThan(esMinDefaultILMVersion) { + return false, fmt.Errorf("%w: Elasticsearch %v does not support ILM", ErrESVersionNotSupported, ver.String()) + } + return true, nil +} + +func createPolicy(cfg Config, info beat.Info, defaultPolicy mapstr.M) (Policy, error) { + name, err := ApplyStaticFmtstr(info, cfg.PolicyName) + if err != nil { + return Policy{}, errors.New("failed to read ilm policy name") + } + + policy := Policy{ + Name: name, + Body: defaultPolicy, + } + if path := cfg.PolicyFile; path != "" { + contents, err := os.ReadFile(path) + if err != nil { + return Policy{}, fmt.Errorf("failed to read policy file '%s': %w", path, err) + } + + var body map[string]interface{} + if err := json.Unmarshal(contents, &body); err != nil { + return Policy{}, fmt.Errorf("failed to decode policy file '%s': %w", path, err) + } + + policy.Body = body + } + return policy, nil +} diff --git a/libbeat/idxmgmt/ilm/client_handler_integration_test.go b/libbeat/idxmgmt/lifecycle/client_handler_integration_test.go similarity index 62% rename from libbeat/idxmgmt/ilm/client_handler_integration_test.go rename to libbeat/idxmgmt/lifecycle/client_handler_integration_test.go index 3701b715692..2df919d55d5 100644 --- a/libbeat/idxmgmt/ilm/client_handler_integration_test.go +++ b/libbeat/idxmgmt/lifecycle/client_handler_integration_test.go @@ -17,10 +17,9 @@ //go:build integration -package ilm_test +package lifecycle import ( - "encoding/json" "fmt" "os" "testing" @@ -30,9 +29,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" - "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" "github.com/elastic/beats/v7/libbeat/version" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/transport/httpcommon" libversion "github.com/elastic/elastic-agent-libs/version" @@ -40,76 +40,94 @@ import ( const ( // ElasticsearchDefaultHost is the default host for elasticsearch. - ElasticsearchDefaultHost = "localhost" + ElasticsearchDefaultHost = "http://localhost" // ElasticsearchDefaultPort is the default port for elasticsearch. ElasticsearchDefaultPort = "9200" ) func TestESClientHandler_CheckILMEnabled(t *testing.T) { t.Run("no ilm if disabled", func(t *testing.T) { - h := newESClientHandler(t) - b, err := h.CheckILMEnabled(false) + cfg := RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": false, "policy_name": "test", "check_exists": true}), + DSL: config.MustNewConfigFrom(mapstr.M{"enabled": false, "data_stream_pattern": "%{[beat.name]}-%{[beat.version]}", "check_exists": true}), + } + h, err := newESClientHandler(t, cfg) + require.NoError(t, err) + b, err := h.CheckEnabled() assert.NoError(t, err) assert.False(t, b) }) t.Run("with ilm if enabled", func(t *testing.T) { - h := newESClientHandler(t) - b, err := h.CheckILMEnabled(true) + cfg := RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": true, "policy_name": "test", "check_exists": true}), + DSL: config.MustNewConfigFrom(mapstr.M{"enabled": false, "data_stream_pattern": "%{[beat.name]}-%{[beat.version]}", "check_exists": true}), + } + h, err := newESClientHandler(t, cfg) + require.NoError(t, err) + b, err := h.CheckEnabled() assert.NoError(t, err) assert.True(t, b) }) } func TestESClientHandler_ILMPolicy(t *testing.T) { - t.Run("does not exist", func(t *testing.T) { - name := makeName("esch-policy-no") - h := newESClientHandler(t) - b, err := h.HasILMPolicy(name) - assert.NoError(t, err) - assert.False(t, b) - }) t.Run("create new", func(t *testing.T) { - policy := ilm.Policy{ + policy := Policy{ Name: makeName("esch-policy-create"), - Body: ilm.DefaultPolicy, + Body: DefaultILMPolicy, } - h := newESClientHandler(t) - err := h.CreateILMPolicy(policy) + cfg := RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": true, "policy_name": "test", "check_exists": true}), + DSL: config.MustNewConfigFrom(mapstr.M{"enabled": false, "data_stream_pattern": "%{[beat.name]}-%{[beat.version]}", "check_exists": true}), + } + rawClient := newRawESClient(t) + h, err := NewESClientHandler(rawClient, beat.Info{Beat: "testbeat"}, cfg) + require.NoError(t, err) + h.cfg.policyRaw = &policy + + err = h.CreatePolicyFromConfig() require.NoError(t, err) - b, err := h.HasILMPolicy(policy.Name) + b, err := h.HasPolicy() assert.NoError(t, err) assert.True(t, b) }) t.Run("overwrite", func(t *testing.T) { - policy := ilm.Policy{ + policy := Policy{ Name: makeName("esch-policy-overwrite"), - Body: ilm.DefaultPolicy, + Body: DefaultILMPolicy, + } + cfg := RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": true, "policy_name": "test", "check_exists": true}), + DSL: config.MustNewConfigFrom(mapstr.M{"enabled": false, "data_stream_pattern": "%{[beat.name]}-%{[beat.version]}", "check_exists": true}), } - h := newESClientHandler(t) + rawClient := newRawESClient(t) + h, err := NewESClientHandler(rawClient, beat.Info{Beat: "testbeat"}, cfg) + require.NoError(t, err) + h.cfg.policyRaw = &policy - err := h.CreateILMPolicy(policy) + err = h.CreatePolicyFromConfig() require.NoError(t, err) // check second 'create' does not throw (assuming race with other beat) - err = h.CreateILMPolicy(policy) + err = h.CreatePolicyFromConfig() require.NoError(t, err) - b, err := h.HasILMPolicy(policy.Name) + b, err := h.HasPolicy() assert.NoError(t, err) assert.True(t, b) }) } -func newESClientHandler(t *testing.T) ilm.ClientHandler { +func newESClientHandler(t *testing.T, cfg RawConfig) (ClientHandler, error) { client := newRawESClient(t) - return ilm.NewESClientHandler(client) + return NewESClientHandler(client, beat.Info{Beat: "testbeat"}, cfg) } -func newRawESClient(t *testing.T) ilm.ESClient { +func newRawESClient(t *testing.T) ESClient { transport := httpcommon.DefaultHTTPTransportSettings() transport.Timeout = 60 * time.Second client, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ @@ -166,29 +184,32 @@ func getEnv(name, def string) string { } func TestFileClientHandler_CheckILMEnabled(t *testing.T) { + defaultCfg := RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": true, "policy_name": "test", "check_exists": true}), + DSL: config.MustNewConfigFrom(mapstr.M{"enabled": false, "data_stream_pattern": "%{[beat.name]}-%{[beat.version]}", "check_exists": true}), + } for name, test := range map[string]struct { - enabled bool version string ilmEnabled bool err bool + cfg RawConfig }{ "ilm enabled": { - enabled: true, + cfg: defaultCfg, + ilmEnabled: true, }, - "ilm disabled": { - enabled: false, - ilmEnabled: false, - }, + "ilm enabled, version too old": { - enabled: true, version: "5.0.0", err: true, + cfg: defaultCfg, }, } { t.Run(name, func(t *testing.T) { - h := ilm.NewFileClientHandler(newMockClient(test.version)) - b, err := h.CheckILMEnabled(test.enabled) + h, err := NewFileClientHandler(newMockClient(test.version), beat.Info{Beat: "test"}, test.cfg) + require.NoError(t, err) + b, err := h.CheckEnabled() assert.Equal(t, test.ilmEnabled, b) if test.err { assert.Error(t, err) @@ -199,20 +220,6 @@ func TestFileClientHandler_CheckILMEnabled(t *testing.T) { } } -func TestFileClientHandler_CreateILMPolicy(t *testing.T) { - c := newMockClient("") - h := ilm.NewFileClientHandler(c) - name := "test-policy" - body := mapstr.M{"foo": "bar"} - h.CreateILMPolicy(ilm.Policy{Name: name, Body: body}) - - assert.Equal(t, name, c.name) - assert.Equal(t, "policy", c.component) - var out mapstr.M - json.Unmarshal([]byte(c.body), &out) - assert.Equal(t, body, out) -} - type mockClient struct { v libversion.V component, name, body string diff --git a/libbeat/idxmgmt/lifecycle/config.go b/libbeat/idxmgmt/lifecycle/config.go new file mode 100644 index 00000000000..b25d5ecff6d --- /dev/null +++ b/libbeat/idxmgmt/lifecycle/config.go @@ -0,0 +1,132 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// Config is used for unpacking a config.C. +type Config struct { + Enabled bool `config:"enabled"` + // PolicyName, used by ILM + PolicyName fmtstr.EventFormatString `config:"policy_name"` + PolicyFile string `config:"policy_file"` + // used only for testing + policyRaw *Policy + + // CheckExists can disable the check for an existing policy. Check required + // read_ilm privileges. If check is disabled the policy will only be + // installed if Overwrite is enabled. + CheckExists bool `config:"check_exists"` + + // Enable always overwrite policy mode. This required manage_ilm privileges. + Overwrite bool `config:"overwrite"` +} + +// DSLNameConfig just stores the datastream name for the DSL policy +// as this is the only config value that differs between ILM and DSL +type DSLNameConfig struct { + DataStreamPattern fmtstr.EventFormatString `config:"data_stream_pattern"` +} + +func DefaultDSLName() DSLNameConfig { + return DSLNameConfig{ + DataStreamPattern: *fmtstr.MustCompileEvent("%{[beat.name]}-%{[beat.version]}"), + } +} + +// LifecycleConfig maps all possible ILM/DSL config values present in a config +type LifecycleConfig struct { + ILM Config `config:"setup.ilm"` + DSL Config `config:"setup.dsl"` +} + +// RawConfig half-unpacks the policy config, allowing us to tell if a user has explicitly +// enabled a given config value +type RawConfig struct { + ILM *config.C `config:"setup.ilm"` + DSL *config.C `config:"setup.dsl"` + TemplateName string `config:"setup.template.name"` +} + +// DefaultILMPolicy defines the default policy to be used if no custom policy is +// configured. +// By default the policy contains not warm, cold, or delete phase. +// The index is configured to rollover every 50GB or after 30d. +var DefaultILMPolicy = mapstr.M{ + "policy": mapstr.M{ + "phases": mapstr.M{ + "hot": mapstr.M{ + "actions": mapstr.M{ + "rollover": mapstr.M{ + "max_primary_shard_size": "50gb", + "max_age": "30d", + }, + }, + }, + }, + }, +} + +// DefaultDSLPolicy defines the default policy to be used for DSL if +// no custom policy is configured +var DefaultDSLPolicy = mapstr.M{ + "data_retention": "7d", +} + +// Validate verifies that expected config options are given and valid +func (cfg *Config) Validate() error { + return nil +} + +func DefaultILMConfig(info beat.Info) LifecycleConfig { + return LifecycleConfig{ + ILM: Config{ + Enabled: true, + PolicyName: *fmtstr.MustCompileEvent(info.Beat), + PolicyFile: "", + CheckExists: true, + }, + DSL: Config{ + Enabled: false, + PolicyName: *fmtstr.MustCompileEvent("%{[beat.name]}-%{[beat.version]}"), + CheckExists: true, + }, + } +} + +func DefaultDSLConfig(info beat.Info) LifecycleConfig { + return LifecycleConfig{ + ILM: Config{ + Enabled: false, + PolicyName: *fmtstr.MustCompileEvent(info.Beat), + PolicyFile: "", + CheckExists: true, + }, + DSL: Config{ + Enabled: true, + PolicyName: *fmtstr.MustCompileEvent("%{[beat.name]}-%{[beat.version]}"), + PolicyFile: "", + CheckExists: true, + }, + } +} diff --git a/libbeat/idxmgmt/lifecycle/error.go b/libbeat/idxmgmt/lifecycle/error.go new file mode 100644 index 00000000000..6568b909bbb --- /dev/null +++ b/libbeat/idxmgmt/lifecycle/error.go @@ -0,0 +1,31 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "errors" +) + +var ( + ErrESVersionNotSupported = errors.New("ILM is not supported by the Elasticsearch version in use") + ErrILMCheckRequestFailed = errors.New("request checking for ILM availability failed") + ErrInvalidResponse = errors.New("invalid response received") + ErrESILMDisabled = errors.New("ILM is disabled in Elasticsearch") + ErrRequestFailed = errors.New("request failed") + ErrOpNotAvailable = errors.New("operation not available, no lifecycle manager is enabled") +) diff --git a/libbeat/idxmgmt/lifecycle/es_client_handler.go b/libbeat/idxmgmt/lifecycle/es_client_handler.go new file mode 100644 index 00000000000..c9077547a5b --- /dev/null +++ b/libbeat/idxmgmt/lifecycle/es_client_handler.go @@ -0,0 +1,214 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "errors" + "fmt" + "net/http" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// ESClientHandler implements the Loader interface for talking to ES. +type ESClientHandler struct { + client ESClient + info beat.Info + cfg Config + defaultPolicy mapstr.M + putPath string + name string + policy Policy + mode Mode +} + +// NewESClientHandler initializes and returns an ESClientHandler +func NewESClientHandler(c ESClient, info beat.Info, cfg RawConfig) (*ESClientHandler, error) { + if !cfg.DSL.Enabled() && cfg.ILM.Enabled() && c.IsServerless() { + return nil, fmt.Errorf("ILM is enabled/configured but %s is connected to a serverless instance; ILM isn't supported on Serverless Elasticsearch. Configure DSL or set setup.ilm.enabled to false", info.Beat) + } + + if !cfg.ILM.Enabled() && cfg.DSL.Enabled() && !c.IsServerless() { + return nil, fmt.Errorf("DSL is enabled/configured but %s is connected to a stateful instance; DSL is only supported on Serverless Elasticsearch. Configure ILM or set setup.dsl.enabled to false", info.Beat) + } + + if cfg.ILM.Enabled() && cfg.DSL.Enabled() { + return nil, fmt.Errorf("only one lifecycle management type can be used, but both ILM and DSL are enabled") + } + + // set default based on ES connection, then unpack user config, if set + lifecycleCfg := Config{} + var err error + if c.IsServerless() { + lifecycleCfg = DefaultDSLConfig(info).DSL + if cfg.DSL != nil { + err = cfg.DSL.Unpack(&lifecycleCfg) + } + + // unpack name value separately + dsName := DefaultDSLName() + err := cfg.DSL.Unpack(&dsName) + if err != nil { + return nil, fmt.Errorf("error unpacking DSL data stream name: %w", err) + } + lifecycleCfg.PolicyName = dsName.DataStreamPattern + + } else { + lifecycleCfg = DefaultILMConfig(info).ILM + if cfg.ILM != nil { + err = cfg.ILM.Unpack(&lifecycleCfg) + } + } + if err != nil { + return nil, fmt.Errorf("error unpacking lifecycle config: %w", err) + } + + // create name and policy + name, err := ApplyStaticFmtstr(info, lifecycleCfg.PolicyName) + if err != nil { + return nil, fmt.Errorf("error applying format string to policy name: %w", err) + } + if name == "" && lifecycleCfg.Enabled { + return nil, errors.New("could not generate usable policy name from config. Check setup.*.policy_name fields") + } + // deal with conflicts between policy name and template name + // under serverless, it doesn't make sense to have a policy name that differs from the template name + // if the user has set both to different values, throw a warning, as overwrite operations will probably fail + if c.IsServerless() { + if cfg.TemplateName != "" && cfg.TemplateName != name { + logp.L().Warnf("policy name is %s but template name is %s; under serverless, non-default template and policy names should be the same. Updates & overwrites may not work.") + } + } + + // set defaults + defaultPolicy := DefaultILMPolicy + mode := ILM + path := fmt.Sprintf("%s/%s", esILMPath, name) + + if c.IsServerless() { + defaultPolicy = DefaultDSLPolicy + mode = DSL + path = fmt.Sprintf("/_data_stream/%s/_lifecycle", name) + } + + var policy Policy + if lifecycleCfg.Enabled { // these are-enabled checks should happen elsewhere, but check again here just in case + policy, err = createPolicy(lifecycleCfg, info, defaultPolicy) + if err != nil { + return nil, fmt.Errorf("error creating a lifecycle policy: %w", err) + } + } + + return &ESClientHandler{client: c, + info: info, cfg: lifecycleCfg, + defaultPolicy: defaultPolicy, name: name, putPath: path, policy: policy, mode: mode}, nil +} + +// CheckExists returns the value of the check_exists config flag +func (h *ESClientHandler) CheckExists() bool { + return h.cfg.CheckExists +} + +// Overwrite returns the value of the overwrite config flag +func (h *ESClientHandler) Overwrite() bool { + return h.cfg.Overwrite +} + +// CheckEnabled indicates whether or not ILM is supported for the configured mode and ES instance. +func (h *ESClientHandler) CheckEnabled() (bool, error) { + return checkILMEnabled(h.cfg.Enabled, h.client) +} + +func (h *ESClientHandler) IsElasticsearch() bool { + return true +} + +// HasPolicy queries Elasticsearch to see if policy with given name exists. +func (h *ESClientHandler) HasPolicy() (bool, error) { + status, b, err := h.client.Request("GET", h.putPath, "", nil, nil) + if err != nil && status != http.StatusNotFound { + return false, fmt.Errorf("%w: failed to check for policy name '%v': (status=%v) (err=%w) %s", + ErrRequestFailed, h.name, status, err, b) + } + return status == http.StatusOK, nil +} + +// CreatePolicyFromConfig creates a DSL policy from a raw setup config for the beat +func (h *ESClientHandler) CreatePolicyFromConfig() error { + // check overwrite before we do this + // normally other upstream components do this check, + // but might as well do it here + if !h.cfg.Overwrite { + found, err := h.HasPolicy() + if err != nil { + return fmt.Errorf("error looking for existing policy: %w", err) + } + // maintain old behavior, don't return an error + if found { + return nil + } + } + // only applicable to testing + if h.cfg.policyRaw != nil { + return h.putPolicyToES(h.putPath, *h.cfg.policyRaw) + } + + err := h.createAndPutPolicy(h.cfg, h.info) + if err != nil { + return fmt.Errorf("error creating policy from config: %w", err) + } + return nil +} + +// PolicyName returns the policy name +func (h *ESClientHandler) PolicyName() string { + return h.name +} + +// Policy returns the full policy +func (h *ESClientHandler) Policy() Policy { + return h.policy +} + +// Mode returns the connected instance mode +func (h *ESClientHandler) Mode() Mode { + return h.mode +} + +// creates a policy from config, then performs the PUT request to ES +func (h *ESClientHandler) createAndPutPolicy(cfg Config, info beat.Info) error { + err := h.putPolicyToES(h.putPath, h.policy) + if err != nil { + return fmt.Errorf("error submitting policy: %w", err) + } + return nil +} + +// performs the PUT operation to create a policy +func (h *ESClientHandler) putPolicyToES(path string, policy Policy) error { + retCode, resp, err := h.client.Request("PUT", path, "", nil, policy.Body) + if retCode >= http.StatusMultipleChoices { + return fmt.Errorf("error creating lifecycle policy: got %d from elasticsearch: %s", retCode, resp) + } + if err != nil { + return fmt.Errorf("error in lifecycle PUT request: %w", err) + } + return nil +} diff --git a/libbeat/idxmgmt/lifecycle/es_client_test.go b/libbeat/idxmgmt/lifecycle/es_client_test.go new file mode 100644 index 00000000000..aa6c55f274f --- /dev/null +++ b/libbeat/idxmgmt/lifecycle/es_client_test.go @@ -0,0 +1,201 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/version" +) + +type mockESClient struct { + serverless bool + hasPolicy bool + foundPolicy interface{} +} + +func (client *mockESClient) GetVersion() version.V { + return *version.MustNew("8.10.1") +} + +func (client *mockESClient) IsServerless() bool { + return client.serverless +} + +func (client *mockESClient) Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) { + if method == "PUT" { + client.foundPolicy = body + } + + if method == "GET" { + if client.hasPolicy || client.foundPolicy != nil { + return http.StatusOK, []byte{}, nil + } else { + return http.StatusNotFound, []byte{}, nil + } + } + + return http.StatusCreated, []byte{}, nil +} + +func TestESSetup(t *testing.T) { + info := beat.Info{Beat: "test", Version: "9.9.9"} + + defaultILMCfg := RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": true, "policy_name": "test", "check_exists": true}), + DSL: config.MustNewConfigFrom(mapstr.M{"enabled": false, "data_stream_pattern": "%{[beat.name]}-%{[beat.version]}", "check_exists": true}), + } + + defaultDSLCfg := RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": false, "policy_name": "test", "check_exists": true}), + DSL: config.MustNewConfigFrom(mapstr.M{"enabled": true, "data_stream_pattern": "%{[beat.name]}-%{[beat.version]}", "check_exists": true}), + } + + bothDisabledConfig := RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": false, "policy_name": "test", "check_exists": true}), + DSL: config.MustNewConfigFrom(mapstr.M{"enabled": false, "data_stream_pattern": "%{[beat.name]}-%{[beat.version]}", "check_exists": true}), + } + + bothEnabledConfig := RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": true, "policy_name": "test", "check_exists": true}), + DSL: config.MustNewConfigFrom(mapstr.M{"enabled": true, "data_stream_pattern": "%{[beat.name]}-%{[beat.version]}", "check_exists": true}), + } + + cases := map[string]struct { + serverless bool + serverHasPolicy bool + cfg RawConfig + err bool + expectedPUTPath string + expectedName string + expectedPolicy interface{} + existingPolicy interface{} + }{ + "serverless-with-correct-defaults": { + serverless: true, + cfg: defaultDSLCfg, + err: false, + expectedPUTPath: "/_data_stream/test-9.9.9/_lifecycle", + expectedName: "test-9.9.9", + expectedPolicy: DefaultDSLPolicy, + }, + "stateful-with-correct-default": { + serverless: false, + cfg: defaultILMCfg, + err: false, + expectedPUTPath: "/_ilm/policy/test", + expectedName: "test", + expectedPolicy: DefaultILMPolicy, + }, + "serverless-with-wrong-defaults": { + serverless: true, + cfg: defaultILMCfg, + err: true, + }, + "stateful-with-wrong-defaults": { + serverless: false, + cfg: defaultDSLCfg, + err: true, + }, + "serverless-with-both-enabled": { + serverless: true, + cfg: bothEnabledConfig, + err: true, + }, + "stateful-with-both-enabled": { + serverless: false, + cfg: bothEnabledConfig, + err: true, + }, + "custom-policy-name": { + serverless: false, + cfg: RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": false, "policy_name": "test-%{[beat.version]}", "check_exists": true}), + }, + err: false, + expectedName: "test-9.9.9", + }, + "custom-policy-file": { + serverless: false, + cfg: RawConfig{ + ILM: config.MustNewConfigFrom(mapstr.M{"enabled": true, + "policy_name": "test", + "policy_file": "./testfiles/custom.json", + "check_exists": true}), + }, + expectedPolicy: mapstr.M{"hello": "world"}, + err: false, + }, + "do-not-overwrite": { + serverless: true, + cfg: defaultDSLCfg, + err: false, + existingPolicy: mapstr.M{"existing": "policy"}, + expectedPolicy: mapstr.M{"existing": "policy"}, + }, + "do-overwrite": { + serverless: true, + cfg: RawConfig{ + DSL: config.MustNewConfigFrom(mapstr.M{"enabled": true, "overwrite": true, + "check_exists": true, "data_stream_pattern": "test"}), + }, + err: false, + existingPolicy: mapstr.M{"existing": "policy"}, + expectedPolicy: DefaultDSLPolicy, + }, + "all-disabled-no-fail": { + serverless: false, + cfg: bothDisabledConfig, + err: false, + }, + "all-disabled-no-fail-serverless": { + serverless: true, + cfg: bothDisabledConfig, + err: false, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + client := &mockESClient{serverless: test.serverless, foundPolicy: test.existingPolicy} + gotClient, err := NewESClientHandler(client, info, test.cfg) + if test.err { + require.Error(t, err, "expected an error") + } else { + require.NoError(t, err, "no error expected") + } + if test.expectedPUTPath != "" { + require.Equal(t, test.expectedPUTPath, gotClient.putPath, "URLs are not the same") + } + if test.expectedName != "" { + require.Equal(t, test.expectedName, gotClient.name, "policy names are not equal") + } + if test.expectedPolicy != nil { + err := gotClient.CreatePolicyFromConfig() + require.NoError(t, err) + require.Equal(t, test.expectedPolicy, client.foundPolicy, "found policies are not equal") + } + }) + } +} diff --git a/libbeat/idxmgmt/lifecycle/file_client_handler.go b/libbeat/idxmgmt/lifecycle/file_client_handler.go new file mode 100644 index 00000000000..53e4030944c --- /dev/null +++ b/libbeat/idxmgmt/lifecycle/file_client_handler.go @@ -0,0 +1,158 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// FileClientHandler implements the Loader interface for writing to a file. +type FileClientHandler struct { + client FileClient + info beat.Info + cfg Config + defaultPolicy mapstr.M + name string + mode Mode + policy Policy +} + +// NewFileClientHandler initializes and returns a new FileClientHandler instance. +func NewFileClientHandler(c FileClient, info beat.Info, cfg RawConfig) (*FileClientHandler, error) { + // half-unpack to distinguish between a config section that's been explicitly enabled, + // that way we can set a proper default + + if cfg.DSL.Enabled() && cfg.ILM.Enabled() { + return nil, fmt.Errorf("only one lifecycle management type can be used, but both ILM and DSL are enabled") + } + + // default to ILM if no configs are set + lifecycleCfg := DefaultILMConfig(info).ILM + var err error + if cfg.DSL.Enabled() { + lifecycleCfg = DefaultDSLConfig(info).DSL + err = cfg.DSL.Unpack(&lifecycleCfg) + + // unpack name value separately + dsName := DefaultDSLName() + err := cfg.DSL.Unpack(&dsName) + if err != nil { + return nil, fmt.Errorf("error unpacking DSL data stream name: %w", err) + } + lifecycleCfg.PolicyName = dsName.DataStreamPattern + } else if cfg.ILM.Enabled() { + lifecycleCfg = DefaultILMConfig(info).ILM + err = cfg.ILM.Unpack(&lifecycleCfg) + } else { + logp.L().Infof("No lifecycle config has been explicitly enabled, defauling to ILM") + } + + if err != nil { + return nil, fmt.Errorf("error unpacking config: %w", err) + } + + name, err := ApplyStaticFmtstr(info, lifecycleCfg.PolicyName) + if err != nil { + return nil, fmt.Errorf("error creating policy name: %w", err) + } + + // set defaults + defaultPolicy := DefaultILMPolicy + mode := ILM + + if cfg.DSL.Enabled() { + defaultPolicy = DefaultDSLPolicy + mode = DSL + } + + policy, err := createPolicy(lifecycleCfg, info, defaultPolicy) + if err != nil { + return nil, fmt.Errorf("error creating policy: %w", err) + } + + return &FileClientHandler{client: c, info: info, cfg: lifecycleCfg, + defaultPolicy: defaultPolicy, name: name, policy: policy, mode: mode}, nil + +} + +// CheckExists returns the state of the check_exists config flag +func (h *FileClientHandler) CheckExists() bool { + return h.cfg.CheckExists +} + +// Overwrite returns the state of the overwrite config flag +func (h *FileClientHandler) Overwrite() bool { + return h.cfg.Enabled +} + +// CheckEnabled indicates whether or not lifecycle management is supported for the configured mode and client version. +func (h *FileClientHandler) CheckEnabled() (bool, error) { + return checkILMEnabled(h.cfg.Enabled, h.client) +} + +// CreatePolicy writes given policy to the configured file. +func (h *FileClientHandler) CreatePolicy(policy Policy) error { + str := fmt.Sprintf("%s\n", policy.Body.StringToPrint()) + if err := h.client.Write("policy", policy.Name, str); err != nil { + return fmt.Errorf("error printing policy : %w", err) + } + return nil +} + +// Policy returns the complete policy +func (h *FileClientHandler) Policy() Policy { + return h.policy +} + +// Mode returns the configured instance mode +func (h *FileClientHandler) Mode() Mode { + return h.mode +} + +// IsElasticsearch returns false +func (h *FileClientHandler) IsElasticsearch() bool { + return false +} + +// CreatePolicyFromConfig creates a lifecycle policy from its config and posts it to elasticsearch +func (h *FileClientHandler) CreatePolicyFromConfig() error { + // only applicable to testing + if h.cfg.policyRaw != nil { + return h.CreatePolicy(*h.cfg.policyRaw) + } + + err := h.CreatePolicy(h.policy) + if err != nil { + return fmt.Errorf("error writing policy: %w", err) + } + return nil +} + +// PolicyName returns the generated policy name. +func (h *FileClientHandler) PolicyName() string { + return h.name +} + +// HasPolicy always returns false. +func (h *FileClientHandler) HasPolicy() (bool, error) { + return false, nil +} diff --git a/libbeat/idxmgmt/ilm/ilm.go b/libbeat/idxmgmt/lifecycle/ilm.go similarity index 62% rename from libbeat/idxmgmt/ilm/ilm.go rename to libbeat/idxmgmt/lifecycle/ilm.go index bd67c186f21..a231db2a6f3 100644 --- a/libbeat/idxmgmt/ilm/ilm.go +++ b/libbeat/idxmgmt/lifecycle/ilm.go @@ -15,38 +15,37 @@ // specific language governing permissions and limitations // under the License. -package ilm +package lifecycle import ( - "encoding/json" - "errors" - "fmt" - "io/ioutil" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/fmtstr" - "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) // SupportFactory is used to define a policy type to be used. -type SupportFactory func(*logp.Logger, beat.Info, *config.C) (Supporter, error) +type SupportFactory func(*logp.Logger, beat.Info, bool) (Supporter, error) // Supporter implements ILM support. For loading the policies // a manager instance must be generated. type Supporter interface { // Query settings Enabled() bool - Policy() Policy - Overwrite() bool - // Manager creates a new Manager instance for checking and installing // resources. Manager(h ClientHandler) Manager } +type Mode int + +const ( + ILM Mode = iota + DSL +) + // Manager uses a ClientHandler to install a policy. type Manager interface { CheckEnabled() (bool, error) @@ -66,69 +65,33 @@ type Policy struct { } // DefaultSupport configures a new default ILM support implementation. -func DefaultSupport(log *logp.Logger, info beat.Info, c *config.C) (Supporter, error) { - cfg := defaultConfig(info) - if c != nil { - if err := c.Unpack(&cfg); err != nil { - return nil, err - } +func DefaultSupport(log *logp.Logger, info beat.Info, lifecycleEnabled bool) (Supporter, error) { + if !lifecycleEnabled { + return NewNoopSupport(info, lifecycleEnabled) } - if !cfg.Enabled { - return NewNoopSupport(info, c) - } - - return StdSupport(log, info, c) + return StdSupport(log, info, lifecycleEnabled) } // StdSupport configures a new std ILM support implementation. -func StdSupport(log *logp.Logger, info beat.Info, c *config.C) (Supporter, error) { +func StdSupport(log *logp.Logger, info beat.Info, lifecycleEnabled bool) (Supporter, error) { if log == nil { log = logp.NewLogger("ilm") } else { log = log.Named("ilm") } - cfg := defaultConfig(info) - if c != nil { - if err := c.Unpack(&cfg); err != nil { - return nil, err - } - } - - name, err := applyStaticFmtstr(info, &cfg.PolicyName) - if err != nil { - return nil, errors.New("failed to read ilm policy name") - } - - policy := Policy{ - Name: name, - Body: DefaultPolicy, - } - if path := cfg.PolicyFile; path != "" { - contents, err := ioutil.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("failed to read policy file '%v': %w", path, err) - } - - var body map[string]interface{} - if err := json.Unmarshal(contents, &body); err != nil { - return nil, fmt.Errorf("failed to decode policy file '%v': %w", path, err) - } - - policy.Body = body - } - - return NewStdSupport(log, cfg.Enabled, policy, cfg.Overwrite, cfg.CheckExists), nil + return NewStdSupport(log, lifecycleEnabled), nil } // NoopSupport configures a new noop ILM support implementation, // should be used when ILM is disabled -func NoopSupport(_ *logp.Logger, info beat.Info, c *config.C) (Supporter, error) { +func NoopSupport(_ *logp.Logger, info beat.Info, c bool) (Supporter, error) { return NewNoopSupport(info, c) } -func applyStaticFmtstr(info beat.Info, fmt *fmtstr.EventFormatString) (string, error) { +// ApplyStaticFmtstr applies the beat info to the given format string +func ApplyStaticFmtstr(info beat.Info, fmt fmtstr.EventFormatString) (string, error) { return fmt.Run( &beat.Event{ Fields: fmtstr.FieldsForBeat(info.Beat, info.Version), diff --git a/libbeat/idxmgmt/lifecycle/ilm_test.go b/libbeat/idxmgmt/lifecycle/ilm_test.go new file mode 100644 index 00000000000..24df5ae7bbe --- /dev/null +++ b/libbeat/idxmgmt/lifecycle/ilm_test.go @@ -0,0 +1,220 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" +) + +func TestDefaultSupport_Init(t *testing.T) { + info := beat.Info{Beat: "test", Version: "9.9.9"} + + t.Run("with custom config", func(t *testing.T) { + tmp, err := DefaultSupport(nil, info, true) + require.NoError(t, err) + + s := tmp.(*stdSupport) + assert := assert.New(t) + assert.True(s.Enabled()) + }) + +} + +func TestDefaultSupport_Manager_Enabled_Serverless(t *testing.T) { + cfg := DefaultDSLConfig(beat.Info{Name: "test"}) + runEnabledTests(t, cfg) +} + +func TestDefaultSupport_Manager_Enabled(t *testing.T) { + cfg := DefaultILMConfig(beat.Info{Name: "test"}) + runEnabledTests(t, cfg) +} + +func runEnabledTests(t *testing.T, cfg LifecycleConfig) { + cases := map[string]struct { + calls []onCall + cfg LifecycleConfig + expectEnabled bool + isEnabled bool + fail error + err bool + }{ + "disabled via config": { + cfg: LifecycleConfig{ILM: Config{Enabled: false}, DSL: Config{Enabled: false}}, + expectEnabled: false, + isEnabled: false, + }, + "disabled via handler": { + calls: []onCall{ + onCheckEnabled().Return(false, ErrESILMDisabled), + }, + expectEnabled: false, + isEnabled: true, + cfg: cfg, + err: true, + }, + "enabled via handler": { + calls: []onCall{ + onCheckEnabled().Return(true, nil), + }, + expectEnabled: true, + isEnabled: true, + cfg: cfg, + }, + "handler confirms enabled flag": { + calls: []onCall{ + onCheckEnabled().Return(true, nil), + }, + cfg: cfg, + expectEnabled: true, + isEnabled: true, + }, + "io error": { + calls: []onCall{ + onCheckEnabled().Return(false, errors.New("ups")), + }, + cfg: cfg, + expectEnabled: false, + isEnabled: true, + err: true, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + + testHandler := newMockHandler(test.cfg, Policy{}, test.calls...) + testManager := createManager(t, testHandler, test.isEnabled) + enabled, err := testManager.CheckEnabled() + + if test.fail == nil && !test.err { + assert.NoError(t, err) + } + if test.err || test.fail != nil { + assert.Error(t, err) + } + if test.fail != nil { + assert.Equal(t, test.fail, err) + } + + assert.Equal(t, test.expectEnabled, enabled) + testHandler.AssertExpectations(t) + }) + } +} + +func TestDefaultSupport_Manager_EnsurePolicy_Serverless(t *testing.T) { + testPolicy := Policy{ + Name: "test", + Body: DefaultDSLPolicy, + } + cfg := DefaultDSLConfig(beat.Info{Name: "test"}) + runEnsurePolicyTest(t, testPolicy, cfg) +} + +func TestDefaultSupport_Manager_EnsurePolicy(t *testing.T) { + testPolicy := Policy{ + Name: "test", + Body: DefaultILMPolicy, + } + cfg := DefaultILMConfig(beat.Info{Name: "test"}) + runEnsurePolicyTest(t, testPolicy, cfg) +} + +func runEnsurePolicyTest(t *testing.T, testPolicy Policy, cfg LifecycleConfig) { + cases := map[string]struct { + calls []onCall + overwrite bool + cfg LifecycleConfig + enabled bool + create bool + fail error + }{ + "create new policy": { + create: true, + calls: []onCall{ + onCheckExists().Return(true), + onHasPolicy().Return(false, nil), + onCreatePolicyFromConfig().Return(nil), + }, + cfg: cfg, + enabled: true, + }, + "policy already exists": { + create: false, + calls: []onCall{ + onCheckExists().Return(true), + onHasPolicy().Return(true, nil), + }, + cfg: cfg, + enabled: true, + }, + "overwrite": { + overwrite: true, + create: true, + enabled: true, + cfg: cfg, + calls: []onCall{ + onCheckExists().Return(true), + onCreatePolicyFromConfig().Return(nil), + }, + }, + "fail": { + calls: []onCall{ + onCheckExists().Return(true), + onHasPolicy().Return(false, nil), + onCreatePolicyFromConfig().Return(ErrRequestFailed), + }, + fail: ErrRequestFailed, + cfg: cfg, + enabled: true, + }, + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + h := newMockHandler(test.cfg, testPolicy, test.calls...) + m := createManager(t, h, test.enabled) + created, err := m.EnsurePolicy(test.overwrite) + + if test.fail == nil { + assert.Equal(t, test.create, created) + require.NoError(t, err) + } else { + require.Error(t, err) + assert.Equal(t, test.fail, err) + } + + h.AssertExpectations(t) + }) + } +} + +func createManager(t *testing.T, h ClientHandler, enabled bool) Manager { + info := beat.Info{Beat: "test", Version: "9.9.9"} + s, err := DefaultSupport(nil, info, enabled) + require.NoError(t, err) + return s.Manager(h) +} diff --git a/libbeat/idxmgmt/ilm/mockapihandler_test.go b/libbeat/idxmgmt/lifecycle/mock_client_handler.go similarity index 54% rename from libbeat/idxmgmt/ilm/mockapihandler_test.go rename to libbeat/idxmgmt/lifecycle/mock_client_handler.go index b527ad118d7..c9309a6db92 100644 --- a/libbeat/idxmgmt/ilm/mockapihandler_test.go +++ b/libbeat/idxmgmt/lifecycle/mock_client_handler.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package ilm +package lifecycle import ( "github.com/stretchr/testify/mock" @@ -23,6 +23,9 @@ import ( type mockHandler struct { mock.Mock + cfg LifecycleConfig + testPolicy Policy + mode Mode } type onCall struct { @@ -36,30 +39,57 @@ func (c onCall) Return(values ...interface{}) onCall { return c } -func newMockHandler(calls ...onCall) *mockHandler { - m := &mockHandler{} +func newMockHandler(cfg LifecycleConfig, testPolicy Policy, calls ...onCall) *mockHandler { + m := &mockHandler{cfg: cfg} for _, c := range calls { m.On(c.name, c.args...).Return(c.returns...) } return m } -func onCheckILMEnabled(enabled bool) onCall { return makeOnCall("CheckILMEnabled", enabled) } -func (h *mockHandler) CheckILMEnabled(enabled bool) (bool, error) { - args := h.Called(enabled) +func onCheckEnabled() onCall { return makeOnCall("CheckEnabled") } +func (h *mockHandler) CheckEnabled() (bool, error) { + args := h.Called() return args.Bool(0), args.Error(1) } -func onHasILMPolicy(name string) onCall { return makeOnCall("HasILMPolicy", name) } -func (h *mockHandler) HasILMPolicy(name string) (bool, error) { - args := h.Called(name) +func onHasPolicy() onCall { return makeOnCall("HasPolicy") } +func (h *mockHandler) HasPolicy() (bool, error) { + args := h.Called() return args.Bool(0), args.Error(1) } -func onCreateILMPolicy(policy Policy) onCall { return makeOnCall("CreateILMPolicy", policy) } -func (h *mockHandler) CreateILMPolicy(policy Policy) error { - args := h.Called(policy) +func onCreatePolicyFromConfig() onCall { return makeOnCall("CreatePolicyFromConfig") } +func (h *mockHandler) CreatePolicyFromConfig() error { + args := h.Called() return args.Error(0) + +} + +func (h *mockHandler) Overwrite() bool { + return h.cfg.ILM.Overwrite || h.cfg.DSL.Overwrite +} + +func (h *mockHandler) PolicyName() string { + return h.testPolicy.Name +} + +func (h *mockHandler) Policy() Policy { + return h.testPolicy +} + +func (h *mockHandler) Mode() Mode { + return h.mode +} + +func (h *mockHandler) IsElasticsearch() bool { + return false +} + +func onCheckExists() onCall { return makeOnCall("CheckExists") } +func (h *mockHandler) CheckExists() bool { + args := h.Called() + return args.Bool(0) } func makeOnCall(name string, args ...interface{}) onCall { diff --git a/libbeat/idxmgmt/ilm/noop.go b/libbeat/idxmgmt/lifecycle/noop_manager.go similarity index 69% rename from libbeat/idxmgmt/ilm/noop.go rename to libbeat/idxmgmt/lifecycle/noop_manager.go index 66731f257b7..6311d0dd2bb 100644 --- a/libbeat/idxmgmt/ilm/noop.go +++ b/libbeat/idxmgmt/lifecycle/noop_manager.go @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -package ilm +package lifecycle import ( "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/elastic-agent-libs/config" ) type noopSupport struct{} @@ -27,14 +26,27 @@ type noopManager struct{} // NewNoopSupport creates a noop ILM implementation with ILM support being always // disabled. Attempts to install a policy will fail. -func NewNoopSupport(info beat.Info, config *config.C) (Supporter, error) { +func NewNoopSupport(info beat.Info, c bool) (Supporter, error) { return (*noopSupport)(nil), nil } -func (*noopSupport) Enabled() bool { return false } -func (*noopSupport) Policy() Policy { return Policy{} } -func (*noopSupport) Overwrite() bool { return false } +// Enabled no-op +func (*noopSupport) Enabled() bool { return false } + +// Policy no-op +func (*noopSupport) Policy() Policy { return Policy{} } + +// Overwrite no-op +func (*noopSupport) Overwrite() bool { return false } + +// Manager no-op func (*noopSupport) Manager(_ ClientHandler) Manager { return (*noopManager)(nil) } -func (*noopManager) CheckEnabled() (bool, error) { return false, nil } -func (*noopManager) EnsurePolicy(_ bool) (bool, error) { return false, errOf(ErrOpNotAvailable) } +// CheckEnabled no-op +func (*noopManager) CheckEnabled() (bool, error) { return false, nil } + +// EnsurePolicy no-op +func (*noopManager) EnsurePolicy(_ bool) (bool, error) { return false, ErrOpNotAvailable } + +// Policyname no-op +func (*noopManager) PolicyName() string { return "" } diff --git a/libbeat/idxmgmt/ilm/std.go b/libbeat/idxmgmt/lifecycle/standard_manager.go similarity index 53% rename from libbeat/idxmgmt/ilm/std.go rename to libbeat/idxmgmt/lifecycle/standard_manager.go index 77a7825ae79..db3d5595afd 100644 --- a/libbeat/idxmgmt/ilm/std.go +++ b/libbeat/idxmgmt/lifecycle/standard_manager.go @@ -15,32 +15,29 @@ // specific language governing permissions and limitations // under the License. -package ilm +package lifecycle import ( + "fmt" "time" "github.com/elastic/elastic-agent-libs/logp" ) +// stdSupport is a config wrapper that carries lifecycle info. type stdSupport struct { - log *logp.Logger - - enabled bool - overwrite bool - checkExists bool - - policy Policy + log *logp.Logger + lifecycleEnabled bool } +// stdManager creates, checks, and updates lifecycle policies. type stdManager struct { *stdSupport client ClientHandler - - // cached info - cache infoCache + cache infoCache } +// infoCache stores config relating to caching lifecycle config type infoCache struct { LastUpdate time.Time Enabled bool @@ -49,25 +46,23 @@ type infoCache struct { var defaultCacheDuration = 5 * time.Minute // NewStdSupport creates an instance of default ILM support implementation. +// This contains only the config, and a manager must be created to write and check +// lifecycle policies. I suspect that with enough time/work, you could merge the stdSupport and stdManager objects func NewStdSupport( log *logp.Logger, - enabled bool, - policy Policy, - overwrite, checkExists bool, + lifecycleEnabled bool, ) Supporter { return &stdSupport{ - log: log, - enabled: enabled, - overwrite: overwrite, - checkExists: checkExists, - policy: policy, + log: log, + lifecycleEnabled: lifecycleEnabled, } } -func (s *stdSupport) Enabled() bool { return s.enabled } -func (s *stdSupport) Policy() Policy { return s.policy } -func (s *stdSupport) Overwrite() bool { return s.overwrite } +// Enabled returns true if either ILM or DSL are enabled +func (s *stdSupport) Enabled() bool { return s.lifecycleEnabled } +// Manager returns a standard support manager. unlike the stdSupport object, +// the manager is capable of writing and checking lifecycle policies. func (s *stdSupport) Manager(h ClientHandler) Manager { return &stdManager{ client: h, @@ -75,65 +70,64 @@ func (s *stdSupport) Manager(h ClientHandler) Manager { } } +// CheckEnabled checks to see if lifecycle management is enabled. func (m *stdManager) CheckEnabled() (bool, error) { - if !m.enabled { - return false, nil + ilmEnabled, err := m.client.CheckEnabled() + if err != nil { + return ilmEnabled, err } if m.cache.Valid() { return m.cache.Enabled, nil } - ilmEnabled, err := m.client.CheckILMEnabled(m.enabled) - if err != nil { - return ilmEnabled, err - } - m.cache.Enabled = ilmEnabled m.cache.LastUpdate = time.Now() return ilmEnabled, nil } +// EnsurePolicy creates the upstream lifecycle policy, depending on if it exists, and if overwrite is set. +// returns true if the policy has been created func (m *stdManager) EnsurePolicy(overwrite bool) (bool, error) { log := m.log - if !m.checkExists { - log.Infof("ILM policy is not checked as setup.ilm.check_exists is disabled") + if !m.client.CheckExists() { + log.Infof("lifecycle policy is not checked as check_exists is disabled") return false, nil } - - overwrite = overwrite || m.Overwrite() - name := m.policy.Name + overwrite = overwrite || m.client.Overwrite() + name := m.client.PolicyName() var exists bool if !overwrite { var err error - exists, err = m.client.HasILMPolicy(name) + exists, err = m.client.HasPolicy() if err != nil { - return false, err + return false, fmt.Errorf("error checking if policy %s exists: %w", name, err) } } switch { case exists && !overwrite: - log.Infof("ILM policy %v exists already.", name) + log.Infof("lifecycle policy %v exists already.", name) return false, nil case !exists || overwrite: - err := m.client.CreateILMPolicy(m.policy) + err := m.client.CreatePolicyFromConfig() if err != nil { - log.Errorf("ILM policy %v creation failed: %v", name, err) + log.Errorf("lifecycle policy %v creation failed: %v", name, err) return false, err } - log.Infof("ILM policy %v successfully created.", name) + log.Infof("lifecycle policy %v successfully created.", name) return true, err default: - log.Infof("ILM policy not created: exists=%v, overwrite=%v.", exists, overwrite) + log.Infof("lifecycle policy not created: exists=%v, overwrite=%v.", exists, overwrite) return false, nil } } +// Valid returns true if the cache is valid func (c *infoCache) Valid() bool { return !c.LastUpdate.IsZero() && time.Since(c.LastUpdate) < defaultCacheDuration } diff --git a/libbeat/idxmgmt/ilm/testfiles/custom.json b/libbeat/idxmgmt/lifecycle/testfiles/custom.json similarity index 100% rename from libbeat/idxmgmt/ilm/testfiles/custom.json rename to libbeat/idxmgmt/lifecycle/testfiles/custom.json diff --git a/libbeat/idxmgmt/mockilm_test.go b/libbeat/idxmgmt/mock_manager_test.go similarity index 68% rename from libbeat/idxmgmt/mockilm_test.go rename to libbeat/idxmgmt/mock_manager_test.go index 3997fd0f216..34fc29d13af 100644 --- a/libbeat/idxmgmt/mockilm_test.go +++ b/libbeat/idxmgmt/mock_manager_test.go @@ -21,8 +21,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" - "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/elastic-agent-libs/logp" ) @@ -36,8 +35,8 @@ type onCall struct { returns []interface{} } -func makeMockILMSupport(calls ...onCall) ilm.SupportFactory { - return func(_ *logp.Logger, _ beat.Info, _ *config.C) (ilm.Supporter, error) { +func makeMockILMSupport(calls ...onCall) lifecycle.SupportFactory { + return func(_ *logp.Logger, _ beat.Info, _ bool) (lifecycle.Supporter, error) { m := &mockILMSupport{} for _, c := range calls { m.On(c.name, c.args...).Return(c.returns...) @@ -58,32 +57,44 @@ func (m *mockILMSupport) Enabled() bool { } func onPolicy() onCall { return makeOnCall("Policy") } -func (m *mockILMSupport) Policy() ilm.Policy { +func (m *mockILMSupport) Policy() lifecycle.Policy { args := m.Called() - return args.Get(0).(ilm.Policy) + return args.Get(0).(lifecycle.Policy) } -func onOverwrite() onCall { return makeOnCall("Overwrite") } +// func onMode() onCall { return makeOnCall("Mode") } +func (m *mockILMSupport) Mode() lifecycle.Mode { + args := m.Called() + return args.Get(0).(lifecycle.Mode) +} + +// func onOverwrite() onCall { return makeOnCall("Overwrite") } func (m *mockILMSupport) Overwrite() bool { return m.Called().Bool(0) } -func (m *mockILMSupport) Manager(_ ilm.ClientHandler) ilm.Manager { +func (m *mockILMSupport) Manager(_ lifecycle.ClientHandler) lifecycle.Manager { return m } -func onCheckEnabled() onCall { return makeOnCall("CheckEnabled") } +// func onCheckEnabled() onCall { return makeOnCall("CheckEnabled") } func (m *mockILMSupport) CheckEnabled() (bool, error) { args := m.Called() return args.Bool(0), args.Error(1) } -func onEnsurePolicy() onCall { return makeOnCall("EnsurePolicy") } +// func onEnsurePolicy() onCall { return makeOnCall("EnsurePolicy") } func (m *mockILMSupport) EnsurePolicy(overwrite bool) (bool, error) { args := m.Called() return args.Bool(0), args.Error(1) } +// func onPolicyName() onCall { return makeOnCall("PolicyName") } +func (m *mockILMSupport) PolicyName() string { + args := m.Called() + return args.String(0) +} + func makeOnCall(name string, args ...interface{}) onCall { return onCall{name: name, args: args} } diff --git a/libbeat/idxmgmt/std_test.go b/libbeat/idxmgmt/std_test.go index 6beb62fa64b..4a5dd0f7d33 100644 --- a/libbeat/idxmgmt/std_test.go +++ b/libbeat/idxmgmt/std_test.go @@ -18,6 +18,8 @@ package idxmgmt import ( + "errors" + "fmt" "testing" "time" @@ -25,23 +27,14 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/mapping" "github.com/elastic/beats/v7/libbeat/template" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" ) -type mockClientHandler struct { - policy string - expectsPolicy bool - - tmplCfg *template.TemplateConfig - tmplForce bool - - operations []mockCreateOp -} - type mockCreateOp uint8 const ( @@ -101,7 +94,7 @@ func TestDefaultSupport_BuildSelector(t *testing.T) { ilmTemplateSettings := func(policy string) []onCall { return []onCall{ onEnabled().Return(true), - onPolicy().Return(ilm.Policy{Name: policy}), + onPolicy().Return(lifecycle.Policy{Name: policy}), } } @@ -211,45 +204,55 @@ func TestDefaultSupport_BuildSelector(t *testing.T) { } func TestIndexManager_VerifySetup(t *testing.T) { + info := beat.Info{Beat: "test", Version: "9.9.9"} for name, setup := range map[string]struct { tmplEnabled, ilmEnabled, ilmOverwrite bool loadTmpl, loadILM LoadMode + lifecycle lifecycle.LifecycleConfig ok bool warn string }{ "load template with ilm without loading ilm": { ilmEnabled: true, tmplEnabled: true, loadILM: LoadModeDisabled, - warn: "whithout loading ILM policy", + warn: "whithout loading ILM policy", + lifecycle: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: true, PolicyName: *fmtstr.MustCompileEvent("test")}}, }, "load ilm without template": { ilmEnabled: true, loadILM: LoadModeUnset, - warn: "without loading template is not recommended", + warn: "without loading template is not recommended", + lifecycle: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: true, PolicyName: *fmtstr.MustCompileEvent("test")}}, }, "template disabled but loading enabled": { - loadTmpl: LoadModeEnabled, - warn: "loading not enabled", + loadTmpl: LoadModeEnabled, + warn: "loading not enabled", + lifecycle: lifecycle.DefaultILMConfig(info), }, "ilm disabled but loading enabled": { loadILM: LoadModeEnabled, tmplEnabled: true, - warn: "loading not enabled", + warn: "loading not enabled", + lifecycle: lifecycle.DefaultILMConfig(info), }, "ilm enabled but loading disabled": { ilmEnabled: true, loadILM: LoadModeDisabled, - warn: "loading not enabled", + warn: "loading not enabled", + lifecycle: lifecycle.DefaultILMConfig(info), }, "template enabled but loading disabled": { tmplEnabled: true, loadTmpl: LoadModeDisabled, - warn: "loading not enabled", + warn: "loading not enabled", + lifecycle: lifecycle.DefaultILMConfig(info), }, "ilm enabled but overwrite disabled": { tmplEnabled: true, ilmEnabled: true, ilmOverwrite: false, loadILM: LoadModeEnabled, - warn: "Overwriting ILM policy is disabled", + warn: "Overwriting lifecycle policy is disabled", + lifecycle: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: true, Overwrite: false, PolicyName: *fmtstr.MustCompileEvent("test")}}, }, "everything enabled": { tmplEnabled: true, ilmEnabled: true, ilmOverwrite: true, - ok: true, + lifecycle: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: true, Overwrite: true, PolicyName: *fmtstr.MustCompileEvent("test")}}, + ok: true, }, } { t.Run(name, func(t *testing.T) { @@ -259,9 +262,10 @@ func TestIndexManager_VerifySetup(t *testing.T) { "setup.template.enabled": setup.tmplEnabled, }) require.NoError(t, err) - support, err := MakeDefaultSupport(ilm.StdSupport)(nil, beat.Info{}, cfg) + support, err := MakeDefaultSupport(lifecycle.StdSupport)(nil, beat.Info{}, cfg) + require.NoError(t, err) + clientHandler, err := newMockClientHandler(setup.lifecycle, info) require.NoError(t, err) - clientHandler := newMockClientHandler() manager := support.Manager(clientHandler, nil) ok, warn := manager.VerifySetup(setup.loadTmpl, setup.loadILM) assert.Equal(t, setup.ok, ok) @@ -307,14 +311,15 @@ func TestIndexManager_Setup(t *testing.T) { } info := beat.Info{Beat: "test", Version: "9.9.9"} defaultCfg := template.DefaultConfig(info) - + defaultLifecycleConfig := lifecycle.DefaultILMConfig(info) + dslLifecycleConfig := lifecycle.DefaultDSLConfig(info) cases := map[string]struct { cfg mapstr.M loadTemplate, loadILM LoadMode - - err bool - tmplCfg *template.TemplateConfig - policy string + ilmCfg lifecycle.LifecycleConfig + err bool + tmplCfg *template.TemplateConfig + policy string }{ "template default ilm default": { tmplCfg: cfgWith(template.DefaultConfig(info), map[string]interface{}{ @@ -324,11 +329,23 @@ func TestIndexManager_Setup(t *testing.T) { "settings.index.lifecycle.name": "test", }), policy: "test", + ilmCfg: defaultLifecycleConfig, + }, + "template-default-dsl-config": { + tmplCfg: cfgWith(template.DefaultConfig(info), map[string]interface{}{ + "overwrite": "true", + "name": "test-9.9.9", + "pattern": "test-9.9.9", + "settings.index.lifecycle.name": "test-9.9.9", + }), + policy: "test-9.9.9", + ilmCfg: dslLifecycleConfig, }, "template default ilm default with policy changed": { cfg: mapstr.M{ "setup.ilm.policy_name": "policy-keep", }, + ilmCfg: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: true, CheckExists: true, PolicyName: *fmtstr.MustCompileEvent("policy-keep")}}, tmplCfg: cfgWith(template.DefaultConfig(info), map[string]interface{}{ "overwrite": "true", "name": "test-9.9.9", @@ -342,6 +359,7 @@ func TestIndexManager_Setup(t *testing.T) { "setup.ilm.enabled": false, }, loadTemplate: LoadModeEnabled, + ilmCfg: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: false}}, tmplCfg: &defaultCfg, }, "template default loadMode Overwrite ilm disabled": { @@ -349,6 +367,7 @@ func TestIndexManager_Setup(t *testing.T) { "setup.ilm.enabled": false, }, loadTemplate: LoadModeOverwrite, + ilmCfg: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: false}}, tmplCfg: cfgWith(template.DefaultConfig(info), map[string]interface{}{ "overwrite": "true", "name": "test-9.9.9", @@ -362,6 +381,7 @@ func TestIndexManager_Setup(t *testing.T) { "pattern": "test-9.9.9", }, loadTemplate: LoadModeForce, + ilmCfg: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: false}}, tmplCfg: cfgWith(template.DefaultConfig(info), map[string]interface{}{ "overwrite": "true", }), @@ -370,12 +390,14 @@ func TestIndexManager_Setup(t *testing.T) { cfg: mapstr.M{ "setup.ilm.enabled": false, }, + ilmCfg: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: false}}, loadTemplate: LoadModeDisabled, }, "template disabled ilm default": { cfg: mapstr.M{ "setup.template.enabled": false, }, + ilmCfg: defaultLifecycleConfig, policy: "test", }, "template disabled ilm disabled, loadMode Overwrite": { @@ -383,6 +405,7 @@ func TestIndexManager_Setup(t *testing.T) { "setup.template.enabled": false, "setup.ilm.enabled": false, }, + ilmCfg: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: false}}, loadILM: LoadModeOverwrite, }, "template disabled ilm disabled loadMode Force": { @@ -390,16 +413,19 @@ func TestIndexManager_Setup(t *testing.T) { "setup.template.enabled": false, "setup.ilm.enabled": false, }, + ilmCfg: lifecycle.LifecycleConfig{ILM: lifecycle.Config{Enabled: false}}, loadILM: LoadModeForce, - policy: "test", }, "template loadmode disabled ilm loadMode enabled": { loadTemplate: LoadModeDisabled, loadILM: LoadModeEnabled, + ilmCfg: defaultLifecycleConfig, policy: "test", }, "template default ilm loadMode disabled": { loadILM: LoadModeDisabled, + ilmCfg: defaultLifecycleConfig, + policy: "test", tmplCfg: cfgWith(template.DefaultConfig(info), map[string]interface{}{ "name": "test-9.9.9", "pattern": "test-9.9.9", @@ -408,16 +434,19 @@ func TestIndexManager_Setup(t *testing.T) { }, "template loadmode disabled ilm loadmode disabled": { loadTemplate: LoadModeDisabled, + ilmCfg: defaultLifecycleConfig, loadILM: LoadModeDisabled, + policy: "test", }, } for name, test := range cases { t.Run(name, func(t *testing.T) { - factory := MakeDefaultSupport(ilm.StdSupport) + factory := MakeDefaultSupport(lifecycle.StdSupport) im, err := factory(nil, info, config.MustNewConfigFrom(test.cfg)) require.NoError(t, err) - clientHandler := newMockClientHandler() + clientHandler, err := newMockClientHandler(test.ilmCfg, info) + require.NoError(t, err) manager := im.Manager(clientHandler, BeatsAssets([]byte("testbeat fields"))) err = manager.Setup(test.loadTemplate, test.loadILM) clientHandler.assertInvariants(t) @@ -431,7 +460,7 @@ func TestIndexManager_Setup(t *testing.T) { } else { assert.Equal(t, test.tmplCfg, clientHandler.tmplCfg) } - assert.Equal(t, test.policy, clientHandler.policy) + assert.Equal(t, test.policy, clientHandler.policyName) } }) } @@ -445,8 +474,38 @@ func (op mockCreateOp) String() string { return names[op] } -func newMockClientHandler() *mockClientHandler { - return &mockClientHandler{} +type mockClientHandler struct { + policyName string + installedPolicy bool + + tmplCfg *template.TemplateConfig + tmplForce bool + lifecycle lifecycle.LifecycleConfig + selectedCfg lifecycle.Config + operations []mockCreateOp + mode lifecycle.Mode +} + +func newMockClientHandler(cfg lifecycle.LifecycleConfig, info beat.Info) (*mockClientHandler, error) { + if cfg.ILM.Enabled && cfg.DSL.Enabled { + return nil, errors.New("both ILM and DSL enabled") + } + + selectedCfg := cfg.ILM + if cfg.DSL.Enabled { + selectedCfg = cfg.DSL + } + + var name string + var err error + if selectedCfg.Enabled { + name, err = lifecycle.ApplyStaticFmtstr(info, selectedCfg.PolicyName) + if err != nil { + return nil, fmt.Errorf("error applying formatting string for template name: %w", err) + } + } + + return &mockClientHandler{selectedCfg: selectedCfg, policyName: name}, nil } func (h *mockClientHandler) Load(config template.TemplateConfig, _ beat.Info, fields []byte, migration bool) error { @@ -456,20 +515,53 @@ func (h *mockClientHandler) Load(config template.TemplateConfig, _ beat.Info, fi return nil } -func (h *mockClientHandler) CheckILMEnabled(enabled bool) (bool, error) { - return enabled, nil +func (h *mockClientHandler) CheckEnabled() (bool, error) { + return h.selectedCfg.Enabled, nil +} + +func (h *mockClientHandler) CheckExists() bool { + return h.selectedCfg.CheckExists +} + +func (h *mockClientHandler) Overwrite() bool { + return h.selectedCfg.Overwrite +} + +func (h *mockClientHandler) HasPolicy() (bool, error) { + return h.installedPolicy, nil +} + +func (h *mockClientHandler) PolicyName() string { + return h.policyName } -func (h *mockClientHandler) HasILMPolicy(name string) (bool, error) { - return h.policy == name, nil +func (h *mockClientHandler) Policy() lifecycle.Policy { + return lifecycle.Policy{} } -func (h *mockClientHandler) CreateILMPolicy(policy ilm.Policy) error { +func (h *mockClientHandler) Mode() lifecycle.Mode { + return h.mode +} + +func (h *mockClientHandler) IsElasticsearch() bool { + return true +} + +func (h *mockClientHandler) createILMPolicy(policy lifecycle.Policy) error { h.recordOp(mockCreatePolicy) - h.policy = policy.Name + h.policyName = policy.Name return nil } +func (h *mockClientHandler) CreatePolicyFromConfig() error { + h.installedPolicy = true + h.recordOp(mockCreatePolicy) + if h.lifecycle.DSL.Enabled { + return h.createILMPolicy(lifecycle.Policy{Name: h.policyName, Body: lifecycle.DefaultILMPolicy}) + } + return h.createILMPolicy(lifecycle.Policy{Name: h.policyName, Body: lifecycle.DefaultDSLPolicy}) +} + func (h *mockClientHandler) recordOp(op mockCreateOp) { h.operations = append(h.operations, op) } diff --git a/libbeat/template/config.go b/libbeat/template/config.go index 9e8a2c24632..db2c688d1c1 100644 --- a/libbeat/template/config.go +++ b/libbeat/template/config.go @@ -42,8 +42,9 @@ type TemplateConfig struct { // TemplateSettings are part of the Elasticsearch template and hold index and source specific information. type TemplateSettings struct { - Index map[string]interface{} `config:"index"` - Source map[string]interface{} `config:"_source"` + Index map[string]interface{} `config:"index"` + Source map[string]interface{} `config:"_source"` + Lifecycle map[string]interface{} `config:"lifecycle"` } // DefaultConfig for index template diff --git a/libbeat/template/load.go b/libbeat/template/load.go index 70afe7df09c..c2c1375664d 100644 --- a/libbeat/template/load.go +++ b/libbeat/template/load.go @@ -26,6 +26,7 @@ import ( "os" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/paths" @@ -39,9 +40,10 @@ type Loader interface { // ESLoader implements Loader interface for loading templates to Elasticsearch. type ESLoader struct { - client ESClient - builder *templateBuilder - log *logp.Logger + client ESClient + lifecycleClient lifecycle.ClientHandler + builder *templateBuilder + log *logp.Logger } // ESClient is a subset of the Elasticsearch client API capable of @@ -49,6 +51,7 @@ type ESLoader struct { type ESClient interface { Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) GetVersion() version.V + IsServerless() bool } // FileLoader implements Loader interface for loading templates to a File. @@ -69,30 +72,34 @@ type StatusError struct { } type templateBuilder struct { - log *logp.Logger + log *logp.Logger + isServerless bool } // NewESLoader creates a new template loader for ES -func NewESLoader(client ESClient) *ESLoader { - return &ESLoader{client: client, builder: newTemplateBuilder(), log: logp.NewLogger("template_loader")} +func NewESLoader(client ESClient, lifecycleClient lifecycle.ClientHandler) (*ESLoader, error) { + if client == nil { + return nil, errors.New("can not load template without active Elasticsearch client") + } + return &ESLoader{client: client, lifecycleClient: lifecycleClient, + builder: newTemplateBuilder(client.IsServerless()), log: logp.NewLogger("template_loader")}, nil } // NewFileLoader creates a new template loader for the given file. -func NewFileLoader(c FileClient) *FileLoader { - return &FileLoader{client: c, builder: newTemplateBuilder(), log: logp.NewLogger("file_template_loader")} +func NewFileLoader(c FileClient, isServerless bool) *FileLoader { + // other components of the file loader will fail if both ILM and DSL are set, + // so at this point it's fairly safe to just pass cfg.DSL.Enabled + return &FileLoader{client: c, builder: newTemplateBuilder(isServerless), log: logp.NewLogger("file_template_loader")} } -func newTemplateBuilder() *templateBuilder { - return &templateBuilder{log: logp.NewLogger("template")} +func newTemplateBuilder(serverlessMode bool) *templateBuilder { + return &templateBuilder{log: logp.NewLogger("template"), isServerless: serverlessMode} } // Load checks if the index mapping template should be loaded. // In case the template is not already loaded or overwriting is enabled, the // template is built and written to index. func (l *ESLoader) Load(config TemplateConfig, info beat.Info, fields []byte, migration bool) error { - if l.client == nil { - return errors.New("can not load template without active Elasticsearch client") - } // build template from config tmpl, err := l.builder.template(config, info, l.client.GetVersion(), migration) @@ -141,6 +148,17 @@ func (l *ESLoader) Load(config TemplateConfig, info beat.Info, fields []byte, mi } if dataStreamExist { l.log.Infof("Data stream with name %q already exists.", templateName) + // for serverless, we can update the lifecycle policy safely + // Note that updating the lifecycle will delete older documents + // if the policy requires it; i.e, changing the data_retention from 10d to 7d + // will delete the documents older than 7 days. + if l.client.IsServerless() { + l.log.Infof("overwriting lifecycle policy") + err = l.lifecycleClient.CreatePolicyFromConfig() + if err != nil { + return fmt.Errorf("error updating lifecycle policy: %w", err) + } + } return nil } @@ -233,7 +251,7 @@ func (b *templateBuilder) template(config TemplateConfig, info beat.Info, esVers b.log.Info("template config not enabled") return nil, nil } - tmpl, err := New(info.Version, info.IndexPrefix, info.ElasticLicensed, esVersion, config, migration) + tmpl, err := New(b.isServerless, info.Version, info.IndexPrefix, info.ElasticLicensed, esVersion, config, migration) if err != nil { return nil, fmt.Errorf("error creating template instance: %w", err) } diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index 278c5def725..cce304798d9 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -38,6 +38,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/esleg/eslegtest" + "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/transport/httpcommon" @@ -68,10 +69,14 @@ func newTestSetup(t *testing.T, cfg TemplateConfig) *testSetup { if err := client.Connect(); err != nil { t.Fatal(err) } - s := testSetup{t: t, client: client, loader: NewESLoader(client), config: cfg} - client.Request("DELETE", "/_data_stream/"+cfg.Name, "", nil, nil) + handler := &mockClientHandler{severless: false, mode: lifecycle.ILM} + loader, err := NewESLoader(client, handler) + require.NoError(t, err) + s := testSetup{t: t, client: client, loader: loader, config: cfg} + // don't care if the cleanup fails, since they might just return a 404 + _, _, _ = client.Request("DELETE", "/_data_stream/"+cfg.Name, "", nil, nil) s.requireDataStreamDoesNotExist("") - client.Request("DELETE", "/_index_template/"+cfg.Name, "", nil, nil) + _, _, _ = client.Request("DELETE", "/_index_template/"+cfg.Name, "", nil, nil) s.requireTemplateDoesNotExist("") return &s } @@ -81,14 +86,10 @@ func newTestSetupWithESClient(t *testing.T, client ESClient, cfg TemplateConfig) if cfg.Name == "" { cfg.Name = fmt.Sprintf("load-test-%+v", rand.Int()) } - return &testSetup{t: t, client: client, loader: NewESLoader(client), config: cfg} -} - -func (ts *testSetup) mustLoadTemplate(body map[string]interface{}) { - ts.t.Helper() - err := ts.loader.loadTemplate(ts.config.Name, body) - require.NoError(ts.t, err) - ts.requireTemplateExists("") + handler := &mockClientHandler{severless: false, mode: lifecycle.ILM} + loader, err := NewESLoader(client, handler) + require.NoError(t, err) + return &testSetup{t: t, client: client, loader: loader, config: cfg} } func (ts *testSetup) loadFromFile(fileElems []string) error { @@ -122,12 +123,14 @@ func (ts *testSetup) requireTemplateExists(name string) { } func (ts *testSetup) cleanupDataStream(name string) { - ts.client.Request("DELETE", "/_data_stream/"+name, "", nil, nil) + _, _, err := ts.client.Request("DELETE", "/_data_stream/"+name, "", nil, nil) + require.NoError(ts.t, err) ts.requireDataStreamDoesNotExist(name) } func (ts *testSetup) cleanupTemplate(name string) { - ts.client.Request("DELETE", "/_index_template/"+name, "", nil, nil) + _, _, err := ts.client.Request("DELETE", "/_index_template/"+name, "", nil, nil) + require.NoError(ts.t, err) ts.requireTemplateDoesNotExist(name) } @@ -172,6 +175,7 @@ func (ts *testSetup) requireTestEventPresent() string { var resp eslegclient.SearchResults err = json.Unmarshal(b, &resp) + require.NoError(ts.t, err) require.Equal(ts.t, 1, resp.Hits.Total.Value, "the test event must be returned") idx := struct { @@ -187,7 +191,8 @@ func TestESLoader_Load(t *testing.T) { t.Run("loading disabled", func(t *testing.T) { setup := newTestSetup(t, TemplateConfig{Enabled: false}) - setup.load(nil) + err := setup.load(nil) + require.NoError(t, err) setup.requireTemplateDoesNotExist("") }) @@ -200,15 +205,6 @@ func TestESLoader_Load(t *testing.T) { require.Contains(t, err.Error(), "version is not semver") }) - t.Run("no Elasticsearch client", func(t *testing.T) { - setup := newTestSetupWithESClient(t, nil, TemplateConfig{Enabled: true}) - - beatInfo := beat.Info{Version: "9.9.9"} - err := setup.loader.Load(setup.config, beatInfo, nil, false) - require.Error(t, err) - require.Contains(t, err.Error(), "can not load template without active Elasticsearch client") - }) - t.Run("cannot check template", func(t *testing.T) { m := getMockElasticsearchClient(t, "HEAD", "/_index_template/", 500, []byte("cannot check template")) setup := newTestSetupWithESClient(t, m, TemplateConfig{Enabled: true}) @@ -259,14 +255,16 @@ func TestESLoader_Load(t *testing.T) { setup.config.Settings = TemplateSettings{Source: map[string]interface{}{"enabled": false}} t.Run("disabled", func(t *testing.T) { - setup.load(nil) + err := setup.load(nil) + require.NoError(t, err) tmpl := getTemplate(t, setup.client, setup.config.Name) assert.Equal(t, true, tmpl.SourceEnabled()) }) t.Run("enabled", func(t *testing.T) { setup.config.Overwrite = true - setup.load(nil) + err := setup.load(nil) + require.NoError(t, err) tmpl := getTemplate(t, setup.client, setup.config.Name) assert.Equal(t, false, tmpl.SourceEnabled()) }) @@ -278,6 +276,7 @@ func TestESLoader_Load(t *testing.T) { setup.mustLoad(fields) exists, err := setup.loader.checkExistsDatastream(setup.config.Name) + require.NoError(t, err) require.True(t, exists, "data stream must exits") // send test event before reloading the template @@ -304,7 +303,8 @@ func TestESLoader_Load(t *testing.T) { Name string `config:"name"` IsDataStream bool `config:"data_stream"` }{Enabled: true, Path: path(t, []string{"testdata", "fields.json"}), Name: nameJSON, IsDataStream: false} - setup.load(nil) + err := setup.load(nil) + require.NoError(t, err) setup.requireTemplateExists(nameJSON) setup.cleanupTemplate(nameJSON) }) @@ -508,7 +508,7 @@ func getTemplate(t *testing.T, client ESClient, templateName string) testTemplat } func (tt *testTemplate) SourceEnabled() bool { - key := fmt.Sprintf("template.mappings._source.enabled") + key := "template.mappings._source.enabled" // _source.enabled is true if it's missing (default) b, _ := tt.HasKey(key) @@ -519,7 +519,7 @@ func (tt *testTemplate) SourceEnabled() bool { val, err := tt.GetValue(key) if !assert.NoError(tt.t, err) { doc, _ := json.MarshalIndent(tt.M, "", " ") - tt.t.Fatal(fmt.Sprintf("failed to read '%v' in %s", key, doc)) + tt.t.Fatalf("failed to read '%v' in %s", key, doc) } return val.(bool) @@ -563,6 +563,22 @@ func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection { return conn } +type mockClientHandler struct { + severless bool + mode lifecycle.Mode +} + +func (cli *mockClientHandler) IsServerless() bool { return cli.severless } +func (cli *mockClientHandler) CheckEnabled() (bool, error) { return true, nil } +func (cli *mockClientHandler) Mode() lifecycle.Mode { return cli.mode } +func (cli *mockClientHandler) IsElasticsearch() bool { return true } +func (cli *mockClientHandler) CheckExists() bool { return true } +func (cli *mockClientHandler) PolicyName() string { return "test" } +func (cli *mockClientHandler) HasPolicy() (bool, error) { return false, nil } +func (cli *mockClientHandler) CreatePolicyFromConfig() error { return nil } +func (cli *mockClientHandler) Policy() lifecycle.Policy { return lifecycle.Policy{Name: "test"} } +func (cli *mockClientHandler) Overwrite() bool { return true } + func getMockElasticsearchClient(t *testing.T, method, endpoint string, code int, body []byte) *eslegclient.Connection { server := esMock(t, method, endpoint, code, body) conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ @@ -580,14 +596,16 @@ func esMock(t *testing.T, method, endpoint string, code int, body []byte) *httpt if r.URL.Path == "/" { w.WriteHeader(200) w.Header().Set("Content-Type", "application/json") - w.Write([]byte(`{"version":{"number":"5.0.0"}}`)) + _, err := w.Write([]byte(`{"version":{"number":"5.0.0"}}`)) + require.NoError(t, err) return } if r.Method == method && strings.HasPrefix(r.URL.Path, endpoint) { w.WriteHeader(code) w.Header().Set("Content-Type", "application/json") - w.Write(body) + _, err := w.Write(body) + require.NoError(t, err) return } @@ -600,7 +618,8 @@ func esMock(t *testing.T, method, endpoint string, code int, body []byte) *httpt w.WriteHeader(c) if body != nil { w.Header().Set("Content-Type", "application/json") - w.Write(body) + _, err := w.Write(body) + require.NoError(t, err) } })) } diff --git a/libbeat/template/load_test.go b/libbeat/template/load_test.go index 59eca112251..8f6b1837d9a 100644 --- a/libbeat/template/load_test.go +++ b/libbeat/template/load_test.go @@ -37,13 +37,25 @@ func TestFileLoader_Load(t *testing.T) { tmplName := fmt.Sprintf("%s-%s", prefix, ver) for name, test := range map[string]struct { - settings TemplateSettings - body mapstr.M - fields []byte - want mapstr.M - wantErr error + settings TemplateSettings + isServerless bool + body mapstr.M + fields []byte + want mapstr.M + wantErr error }{ "load minimal config info": { + isServerless: false, + body: mapstr.M{ + "index_patterns": []string{"mock-7.0.0"}, + "data_stream": struct{}{}, + "priority": 150, + "template": mapstr.M{ + "settings": mapstr.M{"index": nil}}, + }, + }, + "load minimal config info serverless": { + isServerless: true, body: mapstr.M{ "index_patterns": []string{"mock-7.0.0"}, "data_stream": struct{}{}, @@ -53,7 +65,8 @@ func TestFileLoader_Load(t *testing.T) { }, }, "load minimal config with index settings": { - settings: TemplateSettings{Index: mapstr.M{"code": "best_compression"}}, + isServerless: false, + settings: TemplateSettings{Index: mapstr.M{"code": "best_compression"}}, body: mapstr.M{ "index_patterns": []string{"mock-7.0.0"}, "data_stream": struct{}{}, @@ -63,7 +76,8 @@ func TestFileLoader_Load(t *testing.T) { }, }, "load minimal config with source settings": { - settings: TemplateSettings{Source: mapstr.M{"enabled": false}}, + isServerless: false, + settings: TemplateSettings{Source: mapstr.M{"enabled": false}}, body: mapstr.M{ "index_patterns": []string{"mock-7.0.0"}, "data_stream": struct{}{}, @@ -80,6 +94,7 @@ func TestFileLoader_Load(t *testing.T) { }, }, "load config and in-line analyzer fields": { + isServerless: false, body: mapstr.M{ "index_patterns": []string{"mock-7.0.0"}, "data_stream": struct{}{}, @@ -178,6 +193,7 @@ func TestFileLoader_Load(t *testing.T) { }, }, "load config and in-line analyzer fields with name collision": { + isServerless: false, body: mapstr.M{ "index_patterns": []string{"mock-7.0.0"}, "settings": mapstr.M{"index": nil}, @@ -210,7 +226,7 @@ func TestFileLoader_Load(t *testing.T) { } { t.Run(name, func(t *testing.T) { fc := newFileClient(ver) - fl := NewFileLoader(fc) + fl := NewFileLoader(fc, test.isServerless) cfg := DefaultConfig(info) cfg.Settings = test.settings diff --git a/libbeat/template/template.go b/libbeat/template/template.go index 7c43efb8d7d..ae67e7ec542 100644 --- a/libbeat/template/template.go +++ b/libbeat/template/template.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/version" "github.com/elastic/go-ucfg/yaml" @@ -35,7 +36,6 @@ var ( // Defaults used in the template defaultDateDetection = false defaultTotalFieldsLimit = 10000 - defaultNumberOfRoutingShards = 30 defaultMaxDocvalueFieldsSearch = 200 defaultFields []string @@ -52,12 +52,13 @@ type Template struct { esVersion version.V config TemplateConfig migration bool - order int priority int + isServerless bool } // New creates a new template instance func New( + isServerless bool, beatVersion string, beatName string, elasticLicensed bool, @@ -137,6 +138,7 @@ func New( config: config, migration: migration, priority: config.Priority, + isServerless: isServerless, }, nil } @@ -198,6 +200,10 @@ func (t *Template) LoadMinimal() mapstr.M { nil, nil, mapstr.M(t.config.Settings.Source)) } + // delete default settings not available on serverless + if _, ok := t.config.Settings.Index["number_of_shards"]; ok && t.isServerless { + delete(t.config.Settings.Index, "number_of_shards") + } templ["settings"] = mapstr.M{ "index": t.config.Settings.Index, } @@ -242,10 +248,14 @@ func (t *Template) generateComponent(properties, analyzers mapstr.M, dynamicTemp "index": buildIdxSettings( t.esVersion, t.config.Settings.Index, + t.isServerless, ), }, }, } + if len(t.config.Settings.Lifecycle) > 0 { + m.Put("template.lifecycle", t.config.Settings.Lifecycle) + } if len(analyzers) != 0 { m.Put("template.settings.analysis.analyzer", analyzers) } @@ -288,7 +298,7 @@ func buildDynTmpl(ver version.V) mapstr.M { } } -func buildIdxSettings(ver version.V, userSettings mapstr.M) mapstr.M { +func buildIdxSettings(ver version.V, userSettings mapstr.M, isServerless bool) mapstr.M { indexSettings := mapstr.M{ "refresh_interval": "5s", "mapping": mapstr.M{ @@ -305,7 +315,13 @@ func buildIdxSettings(ver version.V, userSettings mapstr.M) mapstr.M { indexSettings.Put("query.default_field", fields) - indexSettings.Put("max_docvalue_fields_search", defaultMaxDocvalueFieldsSearch) + // deal with settings that aren't available on serverless + if isServerless { + logp.L().Infof("remote instance is severless, number_of_shards and max_docvalue_fields_search will be skipped in index template") + userSettings.Delete("number_of_shards") + } else { + indexSettings.Put("max_docvalue_fields_search", defaultMaxDocvalueFieldsSearch) + } indexSettings.DeepUpdate(userSettings) return indexSettings diff --git a/libbeat/template/template_test.go b/libbeat/template/template_test.go index 5ef4371c7be..c1d24eab869 100644 --- a/libbeat/template/template_test.go +++ b/libbeat/template/template_test.go @@ -31,7 +31,7 @@ import ( libversion "github.com/elastic/elastic-agent-libs/version" ) -type testTemplate struct { +type unitTestTemplate struct { t *testing.T tmpl *Template data mapstr.M @@ -109,19 +109,19 @@ func TestTemplate(t *testing.T) { }) } -func createTestTemplate(t *testing.T, beatVersion, esVersion string, config TemplateConfig) *testTemplate { +func createTestTemplate(t *testing.T, beatVersion, esVersion string, config TemplateConfig) *unitTestTemplate { beatVersion = getVersion(beatVersion) esVersion = getVersion(esVersion) ver := libversion.MustNew(esVersion) - template, err := New(beatVersion, "testbeat", false, *ver, config, false) + template, err := New(false, beatVersion, "testbeat", false, *ver, config, false) if err != nil { t.Fatalf("Failed to create the template: %+v", err) } - return &testTemplate{t: t, tmpl: template, data: template.Generate(nil, nil, nil)} + return &unitTestTemplate{t: t, tmpl: template, data: template.Generate(nil, nil, nil)} } -func (t *testTemplate) Has(path string) bool { +func (t *unitTestTemplate) Has(path string) bool { t.t.Helper() has, err := t.data.HasKey(path) if err != nil && err != mapstr.ErrKeyNotFound { @@ -131,7 +131,7 @@ func (t *testTemplate) Has(path string) bool { return has } -func (t *testTemplate) Get(path string) interface{} { +func (t *unitTestTemplate) Get(path string) interface{} { t.t.Helper() val, err := t.data.GetValue(path) if err != nil { @@ -141,14 +141,14 @@ func (t *testTemplate) Get(path string) interface{} { return val } -func (t *testTemplate) AssertMissing(path string) { +func (t *unitTestTemplate) AssertMissing(path string) { t.t.Helper() if t.Has(path) { t.t.Fatalf("Expected '%v' to be missing", path) } } -func (t *testTemplate) Assert(path string, val interface{}) { +func (t *unitTestTemplate) Assert(path string, val interface{}) { t.t.Helper() assert.Equal(t.t, val, t.Get(path)) } diff --git a/libbeat/tests/system/template/template.go b/libbeat/tests/system/template/template.go index 12b7cff7eef..ea869f0f962 100644 --- a/libbeat/tests/system/template/template.go +++ b/libbeat/tests/system/template/template.go @@ -48,7 +48,7 @@ func testTemplateDefaultFieldLength(beatName string, elasticLicensed bool) func( } // Generate a template based on the embedded fields.yml data. - tmpl, err := template.New(version.GetDefaultVersion(), beatName, elasticLicensed, *esVersion, template.TemplateConfig{}, false) + tmpl, err := template.New(false, version.GetDefaultVersion(), beatName, elasticLicensed, *esVersion, template.TemplateConfig{}, false) if err != nil { t.Fatal(err) } diff --git a/libbeat/tests/system/test_ilm.py b/libbeat/tests/system/test_ilm.py index 1f2b89ac562..630ab551593 100644 --- a/libbeat/tests/system/test_ilm.py +++ b/libbeat/tests/system/test_ilm.py @@ -13,7 +13,7 @@ INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False) -MSG_ILM_POLICY_LOADED = re.compile('ILM policy .* successfully created.') +MSG_ILM_POLICY_LOADED = re.compile('lifecycle policy .* successfully created.') class TestRunILM(BaseTest): diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index a03fbf080b4..3fc3c2a011c 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -2173,6 +2173,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is metricbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "metricbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index f1e036707b7..4c90eb43b9f 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -1804,6 +1804,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is packetbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "packetbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 1c8ebebc91a..bf94da0c118 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -1220,6 +1220,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is winlogbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "winlogbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index 7b575e6b0e8..9e08ae7c167 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -1394,6 +1394,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is auditbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "auditbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index ff9b523f947..5768cb55689 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -4804,6 +4804,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is filebeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "filebeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 5c42e49b44f..3f2308b6eff 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -1058,6 +1058,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is functionbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "functionbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index fdbdcbe7042..ff2ac644a9d 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -1430,6 +1430,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is heartbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "heartbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 5ea3499cee5..eb685b2322f 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -2734,6 +2734,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is metricbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "metricbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/osquerybeat/osquerybeat.reference.yml b/x-pack/osquerybeat/osquerybeat.reference.yml index 9d2c21c5ad1..7963a71ea5f 100644 --- a/x-pack/osquerybeat/osquerybeat.reference.yml +++ b/x-pack/osquerybeat/osquerybeat.reference.yml @@ -777,6 +777,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is osquerybeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "osquerybeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index f1e036707b7..4c90eb43b9f 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -1804,6 +1804,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is packetbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "packetbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. diff --git a/x-pack/winlogbeat/winlogbeat.reference.yml b/x-pack/winlogbeat/winlogbeat.reference.yml index 0ec02b0e5ae..8d0606ba516 100644 --- a/x-pack/winlogbeat/winlogbeat.reference.yml +++ b/x-pack/winlogbeat/winlogbeat.reference.yml @@ -1222,6 +1222,30 @@ setup.template.settings: # Overwrite the lifecycle policy at startup. The default is false. #setup.ilm.overwrite: false +# ======================== Data Stream Lifecycle (DSL) ========================= + +# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch. +# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects. + +# Enable DSL support. Valid values are true, or false. +#setup.dsl.enabled: true + +# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for. +# The default data stream pattern is winlogbeat-%{[agent.version]}" +#setup.dsl.data_stream_pattern: "winlogbeat-%{[agent.version]}" + +# The path to a JSON file that contains a lifecycle policy configuration. Used +# to load your own lifecycle policy. +# If no custom policy is specified, a default policy with a lifetime of 7 days will be created. +#setup.dsl.policy_file: + +# Disable the check for an existing lifecycle policy. The default is true. If +# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy +# can be installed. +#setup.dsl.check_exists: true + +# Overwrite the lifecycle policy at startup. The default is false. +#setup.dsl.overwrite: false # =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.