Skip to content

Commit

Permalink
Support serverless (#36649)
Browse files Browse the repository at this point in the history
* make serverless integration tests run

* update deps

* linter, error handling

* still fixing error handling

* fixing old formatting verbs

* still finding format verbs

* add docs, fix typos

* initial functional pass

* fix setup, config

* fix naming of config section

* add headers

* make linter happy

* still making linter happy

* tinkering with tests

* still fixing tests

* revert file

* tinker with export

* fix logging in tests

* fix load checking in setup

* fix url in integration test

* fix commented out test line

* stil tinkering with integraton test

* fix bad init in tests, add more check to ES handler

* add init checks for client handler, add more unit tests

* make template loader serverless aware

* change naming, error handling, rework config system

* fix up integration tests

* clean up load tests

* stil making linter happy

* simplify manager init, fix tests, update docs

* minor test fixes

* clean up tests

* clean up typos, remove legacy error handling

* expand logging

* logging, error handling changes

* change error messages

* update lifetimes for serverless elasticsearch

* fix integration tests

* change error handling, clean up log messages

* tinker with DSL config name

* update docs

* fix name example
  • Loading branch information
fearful-symmetry authored Oct 4, 2023
1 parent df10d97 commit 1d70a83
Show file tree
Hide file tree
Showing 53 changed files with 2,047 additions and 919 deletions.
24 changes: 24 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,30 @@ setup.template.settings:
# Overwrite the lifecycle policy at startup. The default is false.
#setup.ilm.overwrite: false

# ======================== Data Stream Lifecycle (DSL) =========================

# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch.
# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects.

# Enable DSL support. Valid values are true, or false.
#setup.dsl.enabled: true

# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for.
# The default data stream pattern is auditbeat-%{[agent.version]}"
#setup.dsl.data_stream_pattern: "auditbeat-%{[agent.version]}"

# The path to a JSON file that contains a lifecycle policy configuration. Used
# to load your own lifecycle policy.
# If no custom policy is specified, a default policy with a lifetime of 7 days will be created.
#setup.dsl.policy_file:

# Disable the check for an existing lifecycle policy. The default is true. If
# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy
# can be installed.
#setup.dsl.check_exists: true

# Overwrite the lifecycle policy at startup. The default is false.
#setup.dsl.overwrite: false
# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
Expand Down
24 changes: 24 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2434,6 +2434,30 @@ setup.template.settings:
# Overwrite the lifecycle policy at startup. The default is false.
#setup.ilm.overwrite: false

# ======================== Data Stream Lifecycle (DSL) =========================

# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch.
# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects.

# Enable DSL support. Valid values are true, or false.
#setup.dsl.enabled: true

# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for.
# The default data stream pattern is filebeat-%{[agent.version]}"
#setup.dsl.data_stream_pattern: "filebeat-%{[agent.version]}"

# The path to a JSON file that contains a lifecycle policy configuration. Used
# to load your own lifecycle policy.
# If no custom policy is specified, a default policy with a lifetime of 7 days will be created.
#setup.dsl.policy_file:

# Disable the check for an existing lifecycle policy. The default is true. If
# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy
# can be installed.
#setup.dsl.check_exists: true

# Overwrite the lifecycle policy at startup. The default is false.
#setup.dsl.overwrite: false
# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
Expand Down
24 changes: 24 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,30 @@ setup.template.settings:
# Overwrite the lifecycle policy at startup. The default is false.
#setup.ilm.overwrite: false

# ======================== Data Stream Lifecycle (DSL) =========================

# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch.
# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects.

# Enable DSL support. Valid values are true, or false.
#setup.dsl.enabled: true

# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for.
# The default data stream pattern is heartbeat-%{[agent.version]}"
#setup.dsl.data_stream_pattern: "heartbeat-%{[agent.version]}"

# The path to a JSON file that contains a lifecycle policy configuration. Used
# to load your own lifecycle policy.
# If no custom policy is specified, a default policy with a lifetime of 7 days will be created.
#setup.dsl.policy_file:

# Disable the check for an existing lifecycle policy. The default is true. If
# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy
# can be installed.
#setup.dsl.check_exists: true

# Overwrite the lifecycle policy at startup. The default is false.
#setup.dsl.overwrite: false
# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
Expand Down
1 change: 1 addition & 0 deletions libbeat/_meta/config/default.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
{{template "setup.dashboards.reference.yml.tmpl" .}}
{{template "setup.template.reference.yml.tmpl" .}}
{{template "setup.ilm.reference.yml.tmpl" .}}
{{template "setup.dsl.reference.yml.tmpl" .}}
{{template "setup.kibana.reference.yml.tmpl" .}}
{{template "logging.reference.yml.tmpl" .}}
{{template "monitoring.reference.yml.tmpl" .}}
Expand Down
24 changes: 24 additions & 0 deletions libbeat/_meta/config/setup.dsl.reference.yml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{{header "Data Stream Lifecycle (DSL)"}}

# Configure Data Stream Lifecycle to manage data streams while connected to Serverless elasticsearch.
# These settings are mutually exclusive with ILM settings which are not supported in Serverless projects.

# Enable DSL support. Valid values are true, or false.
#setup.dsl.enabled: true

# Set the lifecycle policy name or pattern. For DSL, this name must match the data stream that the lifecycle is for.
# The default data stream pattern is {{.BeatName}}-%{[agent.version]}"
#setup.dsl.data_stream_pattern: "{{.BeatName}}-%{[agent.version]}"

# The path to a JSON file that contains a lifecycle policy configuration. Used
# to load your own lifecycle policy.
# If no custom policy is specified, a default policy with a lifetime of 7 days will be created.
#setup.dsl.policy_file:

# Disable the check for an existing lifecycle policy. The default is true. If
# you disable this check, set setup.dsl.overwrite: true so the lifecycle policy
# can be installed.
#setup.dsl.check_exists: true

# Overwrite the lifecycle policy at startup. The default is false.
#setup.dsl.overwrite: false
12 changes: 9 additions & 3 deletions libbeat/cmd/export/ilm_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
)

// GenGetILMPolicyCmd is the command used to export the ilm policy.
Expand All @@ -35,14 +35,20 @@ func GenGetILMPolicyCmd(settings instance.Settings) *cobra.Command {
dir, _ := cmd.Flags().GetString("dir")

if settings.ILM == nil {
settings.ILM = ilm.StdSupport
settings.ILM = lifecycle.StdSupport
}
b, err := instance.NewInitializedBeat(settings)
if err != nil {
fatalfInitCmd(err)
}

clientHandler := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version))
// the way this works, we decide to export ILM or DSL based on the user's config.
// This means that if a user has no index management config, we'll default to ILM, regardless of what the user
// is connected to. Might not be a problem since a user who doesn't have any custom lifecycle config has nothing to export?
clientHandler, err := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version), b.Info, b.Config.LifecycleConfig)
if err != nil {
fatalf("error creating file handler: %s", err)
}
idxManager := b.IdxSupporter.Manager(clientHandler, idxmgmt.BeatsAssets(b.Fields))
if err := idxManager.Setup(idxmgmt.LoadModeDisabled, idxmgmt.LoadModeForce); err != nil {
fatalf("Error exporting ilm-policy: %+v.", err)
Expand Down
11 changes: 7 additions & 4 deletions libbeat/cmd/export/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
)

// GenTemplateConfigCmd is the command used to export the elasticsearch template.
Expand All @@ -36,18 +36,21 @@ func GenTemplateConfigCmd(settings instance.Settings) *cobra.Command {
noILM, _ := cmd.Flags().GetBool("noilm")

if noILM {
settings.ILM = ilm.NoopSupport
settings.ILM = lifecycle.NoopSupport
}
if settings.ILM == nil {
settings.ILM = ilm.StdSupport
settings.ILM = lifecycle.StdSupport
}

b, err := instance.NewInitializedBeat(settings)
if err != nil {
fatalfInitCmd(err)
}

clientHandler := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version))
clientHandler, err := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version), b.Info, b.Config.LifecycleConfig)
if err != nil {
fatalf("error creating file handler: %s", err)
}
idxManager := b.IdxSupporter.Manager(clientHandler, idxmgmt.BeatsAssets(b.Fields))
if err := idxManager.Setup(idxmgmt.LoadModeForce, idxmgmt.LoadModeDisabled); err != nil {
fatalf("Error exporting template: %+v.", err)
Expand Down
18 changes: 16 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/features"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/management"
Expand Down Expand Up @@ -133,6 +134,9 @@ type beatConfig struct {
// monitoring settings
MonitoringBeatConfig monitoring.BeatConfig `config:",inline"`

// ILM settings
LifecycleConfig lifecycle.RawConfig `config:",inline"`

// central management settings
Management *config.C `config:"management"`

Expand Down Expand Up @@ -671,7 +675,13 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
if setup.IndexManagement || setup.ILMPolicy {
loadILM = idxmgmt.LoadModeEnabled
}
m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields))

mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Config.LifecycleConfig)
if err != nil {
return fmt.Errorf("error creating index management handler: %w", err)
}

m := b.IdxSupporter.Manager(mgmtHandler, idxmgmt.BeatsAssets(b.Fields))
if ok, warn := m.VerifySetup(loadTemplate, loadILM); !ok {
fmt.Println(warn)
}
Expand Down Expand Up @@ -1065,7 +1075,11 @@ func (b *Beat) registerESIndexManagement() error {

func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
return func(esClient *eslegclient.Connection) error {
m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields))
mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Config.LifecycleConfig)
if err != nil {
return fmt.Errorf("error creating index management handler: %w", err)
}
m := b.IdxSupporter.Manager(mgmtHandler, idxmgmt.BeatsAssets(b.Fields))
return m.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeEnabled)
}
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/cmd/instance/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
"github.com/elastic/beats/v7/libbeat/monitoring/report"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
)
Expand All @@ -42,7 +42,7 @@ type Settings struct {

// load custom index manager. The config object will be the Beats root configuration.
IndexManagement idxmgmt.SupportFactory
ILM ilm.SupportFactory
ILM lifecycle.SupportFactory

Processing processing.SupportFactory

Expand Down
29 changes: 14 additions & 15 deletions libbeat/common/fmtstr/formatevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ type eventEvalContext struct {
}

var (
errMissingKeys = errors.New("missing keys")
errConvertString = errors.New("can not convert to string")
)

Expand Down Expand Up @@ -157,19 +156,24 @@ func CompileEvent(in string) (*EventFormatString, error) {
func (fs *EventFormatString) Unpack(v interface{}) error {
s, err := tryConvString(v)
if err != nil {
return err
return fmt.Errorf("error converting type %T to event formatter: %w", v, err)
}

tmp, err := CompileEvent(s)
if err != nil {
return err
return fmt.Errorf("error compiling event formatter: %w", err)
}

// init fs from tmp
*fs = *tmp
return nil
}

// IsInitialized returns true if the underlying event formatter is prepared to format an event
func (fs *EventFormatString) IsInitialized() bool {
return fs.formatter != nil
}

// NumFields returns number of unique event fields used by the format string.
func (fs *EventFormatString) NumFields() int {
return len(fs.fields)
Expand All @@ -190,6 +194,9 @@ func (fs *EventFormatString) Fields() []string {
// Run executes the format string returning a new expanded string or an error
// if execution or event field expansion fails.
func (fs *EventFormatString) Run(event *beat.Event) (string, error) {
if !fs.IsInitialized() {
return "", fmt.Errorf("event formatter is nil")
}
ctx := newEventCtx(len(fs.fields))
defer releaseCtx(ctx)

Expand Down Expand Up @@ -296,7 +303,7 @@ func (e *eventFieldCompiler) compileEventField(
ops []VariableOp,
) (FormatEvaler, error) {
if len(ops) > 1 {
return nil, errors.New("Too many format modifiers given")
return nil, errors.New("too many format modifiers given")
}

defaultValue := ""
Expand Down Expand Up @@ -340,34 +347,26 @@ func (e *eventFieldCompiler) compileTimestamp(
ops []VariableOp,
) (FormatEvaler, error) {
if expression[0] != '+' {
return nil, errors.New("No timestamp expression")
return nil, errors.New("no timestamp expression")
}

formatter, err := dtfmt.NewFormatter(expression[1:])
if err != nil {
return nil, fmt.Errorf("%v in timestamp expression", err)
return nil, fmt.Errorf("%w in timestamp expression", err)
}

e.timestamp = true
return &eventTimestampEvaler{formatter}, nil
}

func (e *eventFieldEvaler) Eval(c interface{}, out *bytes.Buffer) error {
type stringer interface {
String() string
}

ctx := c.(*eventEvalContext)
s := ctx.keys[e.index]
_, err := out.WriteString(s)
return err
}

func (e *defaultEventFieldEvaler) Eval(c interface{}, out *bytes.Buffer) error {
type stringer interface {
String() string
}

ctx := c.(*eventEvalContext)
s := ctx.keys[e.index]
if s == "" {
Expand All @@ -385,7 +384,7 @@ func (e *eventTimestampEvaler) Eval(c interface{}, out *bytes.Buffer) error {

func parseEventPath(field string) (string, error) {
field = strings.Trim(field, " \n\r\t")
var fields []string
fields := []string{}

for len(field) > 0 {
if field[0] != '[' {
Expand Down
4 changes: 2 additions & 2 deletions libbeat/dashboards/kibana_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type KibanaLoader struct {
// NewKibanaLoader creates a new loader to load Kibana files
func NewKibanaLoader(ctx context.Context, cfg *config.C, dashboardsConfig *Config, hostname string, msgOutputter MessageOutputter, beatname string) (*KibanaLoader, error) {
if cfg == nil || !cfg.Enabled() {
return nil, fmt.Errorf("Kibana is not configured or enabled")
return nil, fmt.Errorf("kibana is not configured or enabled")
}

client, err := getKibanaClient(ctx, cfg, dashboardsConfig.Retry, 0, beatname)
Expand Down Expand Up @@ -127,7 +127,7 @@ func (loader KibanaLoader) ImportIndexFile(file string) error {
// ImportIndex imports the passed index pattern to Kibana
func (loader KibanaLoader) ImportIndex(pattern mapstr.M) error {
if loader.version.LessThan(minimumRequiredVersionSavedObjects) {
return fmt.Errorf("Kibana version must be at least " + minimumRequiredVersionSavedObjects.String())
return fmt.Errorf("kibana version must be at least " + minimumRequiredVersionSavedObjects.String())
}

var errs multierror.Errors
Expand Down
1 change: 1 addition & 0 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func (conn *Connection) getVersion() error {
}

if versionData.Version.BuildFlavor == "serverless" {
conn.log.Info("build flavor of es is severless, marking connection as serverless")
conn.isServerless = true
} else if versionData.Version.BuildFlavor == "default" {
conn.isServerless = false
Expand Down
Loading

0 comments on commit 1d70a83

Please sign in to comment.