diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 060c4702dad3..e3d29c2e61c1 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -40,6 +40,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" @@ -80,7 +81,9 @@ type Filebeat struct { type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin type StateStore interface { - Access() (*statestore.Store, error) + // Access returns the storage registry depending on the type. This is needed for the Elasticsearch state store which + // is guarded by the feature.IsElasticsearchStateStoreEnabledForInput(typ) check. + Access(typ string) (*statestore.Store, error) CleanupInterval() time.Duration } @@ -299,13 +302,36 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return err } - stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry) + // Use context, like normal people do, hooking up to the beat.done channel + ctx, cn := context.WithCancel(context.Background()) + go func() { + <-fb.done + cn() + }() + + stateStore, err := openStateStore(ctx, b.Info, logp.NewLogger("filebeat"), config.Registry) if err != nil { logp.Err("Failed to open state store: %+v", err) return err } defer stateStore.Close() + // If notifier is set, configure the listener for output configuration + // The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage + // in order to allow it fully configure + if stateStore.notifier != nil { + b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error { + outCfg := conf.Namespace{} + if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" { + logp.Err("Failed to unpack the output config: %v", err) + return nil + } + + stateStore.notifier.Notify(outCfg.Config()) + return nil + }) + } + err = processLogInputTakeOver(stateStore, config) if err != nil { logp.Err("Failed to attempt filestream state take over: %+v", err) @@ -351,6 +377,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error { defer func() { _ = inputTaskGroup.Stop() }() + + // Store needs to be fully configured at this point if err := v2InputLoader.Init(&inputTaskGroup); err != nil { logp.Err("Failed to initialize the input managers: %v", err) return err @@ -535,7 +563,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error { return nil } - store, err := stateStore.Access() + store, err := stateStore.Access("") if err != nil { return fmt.Errorf("Failed to access state when attempting take over: %w", err) } diff --git a/filebeat/beater/store.go b/filebeat/beater/store.go index 745c507d6e5d..a32e248aba85 100644 --- a/filebeat/beater/store.go +++ b/filebeat/beater/store.go @@ -18,11 +18,15 @@ package beater import ( + "context" "time" "github.com/elastic/beats/v7/filebeat/config" + "github.com/elastic/beats/v7/filebeat/features" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/beats/v7/libbeat/statestore/backend/es" "github.com/elastic/beats/v7/libbeat/statestore/backend/memlog" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/paths" @@ -30,12 +34,31 @@ import ( type filebeatStore struct { registry *statestore.Registry + esRegistry *statestore.Registry storeName string cleanInterval time.Duration + + // Notifies the Elasticsearch store about configuration change + // which is available only after the beat runtime manager connects to the Agent + // and receives the output configuration + notifier *es.Notifier } -func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) { - memlog, err := memlog.New(logger, memlog.Settings{ +func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) { + var ( + reg backend.Registry + err error + + esreg *es.Registry + notifier *es.Notifier + ) + + if features.IsElasticsearchStateStoreEnabled() { + notifier = es.NewNotifier() + esreg = es.New(ctx, logger, notifier) + } + + reg, err = memlog.New(logger, memlog.Settings{ Root: paths.Resolve(paths.Data, cfg.Path), FileMode: cfg.Permissions, }) @@ -43,18 +66,29 @@ func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (* return nil, err } - return &filebeatStore{ - registry: statestore.NewRegistry(memlog), + store := &filebeatStore{ + registry: statestore.NewRegistry(reg), storeName: info.Beat, cleanInterval: cfg.CleanInterval, - }, nil + notifier: notifier, + } + + if esreg != nil { + store.esRegistry = statestore.NewRegistry(esreg) + } + + return store, nil } func (s *filebeatStore) Close() { s.registry.Close() } -func (s *filebeatStore) Access() (*statestore.Store, error) { +// Access returns the storage registry depending on the type. Default is the file store. +func (s *filebeatStore) Access(typ string) (*statestore.Store, error) { + if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil { + return s.esRegistry.Get(s.storeName) + } return s.registry.Get(s.storeName) } diff --git a/filebeat/features/features.go b/filebeat/features/features.go new file mode 100644 index 000000000000..803aa5b5bdeb --- /dev/null +++ b/filebeat/features/features.go @@ -0,0 +1,59 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package features + +import ( + "os" + "strings" +) + +// List of input types Elasticsearch state store is enabled for +var esTypesEnabled map[string]struct{} + +var isESEnabled bool + +func init() { + initFromEnv("AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES") +} + +func initFromEnv(envName string) { + esTypesEnabled = make(map[string]struct{}) + + arr := strings.Split(os.Getenv(envName), ",") + for _, e := range arr { + k := strings.TrimSpace(e) + if k != "" { + esTypesEnabled[k] = struct{}{} + } + } + isESEnabled = len(esTypesEnabled) > 0 +} + +// IsElasticsearchStateStoreEnabled returns true if feature is enabled for agentless +func IsElasticsearchStateStoreEnabled() bool { + return isESEnabled +} + +// IsElasticsearchStateStoreEnabledForInput returns true if the provided input type uses Elasticsearch for state storage if the Elasticsearch state store feature is enabled +func IsElasticsearchStateStoreEnabledForInput(inputType string) bool { + if IsElasticsearchStateStoreEnabled() { + _, ok := esTypesEnabled[inputType] + return ok + } + return false +} diff --git a/filebeat/features/features_test.go b/filebeat/features/features_test.go new file mode 100644 index 000000000000..00702ae379e3 --- /dev/null +++ b/filebeat/features/features_test.go @@ -0,0 +1,86 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package features + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_initFromEnv(t *testing.T) { + const envName = "TEST_AGENTLESS_ENV" + + t.Run("Without setting env", func(t *testing.T) { + // default init + assert.False(t, IsElasticsearchStateStoreEnabled()) + assert.Empty(t, esTypesEnabled) + assert.False(t, IsElasticsearchStateStoreEnabledForInput("xxx")) + + // init from env + initFromEnv(envName) + assert.False(t, IsElasticsearchStateStoreEnabled()) + assert.Empty(t, esTypesEnabled) + assert.False(t, IsElasticsearchStateStoreEnabledForInput("xxx")) + }) + + tests := []struct { + name string + value string + wantEnabled bool + wantContains []string + }{ + { + name: "Empty", + value: "", + wantEnabled: false, + wantContains: nil, + }, + { + name: "Single value", + value: "xxx", + wantEnabled: true, + wantContains: []string{"xxx"}, + }, + { + name: "Multiple values", + value: "xxx,yyy", + wantEnabled: true, + wantContains: []string{"xxx", "yyy"}, + }, + { + name: "Multiple values with spaces", + value: ",,, , xxx , yyy, ,,,,", + wantEnabled: true, + wantContains: []string{"xxx", "yyy"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Setenv(envName, tt.value) + initFromEnv(envName) + + assert.Equal(t, tt.wantEnabled, IsElasticsearchStateStoreEnabled()) + for _, contain := range tt.wantContains { + assert.Contains(t, esTypesEnabled, contain) + assert.True(t, IsElasticsearchStateStoreEnabledForInput(contain)) + } + assert.Len(t, esTypesEnabled, len(tt.wantContains)) + }) + } +} diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index a4dfd2c15fe3..0254420a0bf0 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -194,7 +194,7 @@ func (e *inputTestingEnvironment) abspath(filename string) string { } func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) { - inputStore, _ := e.stateStore.Access() + inputStore, _ := e.stateStore.Access("") actual := 0 err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) { @@ -331,7 +331,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str e.t.Fatalf("cannot stat file when cheking for offset: %+v", err) } - inputStore, _ := e.stateStore.Access() + inputStore, _ := e.stateStore.Access("") id := getIDFromPath(filepath, inputID, fi) var entry registryEntry @@ -352,7 +352,7 @@ func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expect } func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) { - inputStore, _ := e.stateStore.Access() + inputStore, _ := e.stateStore.Access("") var entry registryEntry err := inputStore.Get(key, &entry) @@ -553,7 +553,7 @@ func (s *testInputStore) Close() { s.registry.Close() } -func (s *testInputStore) Access() (*statestore.Store, error) { +func (s *testInputStore) Access(_ string) (*statestore.Store, error) { return s.registry.Get("filebeat") } diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index 735ea0d0ffe7..970c6c7e7a56 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -247,7 +247,7 @@ func (s *testStore) Close() { s.registry.Close() } -func (s *testStore) Access() (*statestore.Store, error) { +func (s *testStore) Access(_ string) (*statestore.Store, error) { return s.registry.Get("filestream-benchmark") } diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index c65ccb5e3089..ccac725b3d1b 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -88,7 +88,7 @@ const globalInputID = ".global" // StateStore interface and configurations used to give the Manager access to the persistent store. type StateStore interface { - Access() (*statestore.Store, error) + Access(typ string) (*statestore.Store, error) CleanupInterval() time.Duration } diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 85f40d1f3a33..9a65f0cd011d 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -144,7 +144,7 @@ var closeStore = (*store).close func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) { ok := false - persistentStore, err := statestore.Access() + persistentStore, err := statestore.Access("") if err != nil { return nil, err } diff --git a/filebeat/input/filestream/internal/input-logfile/store_test.go b/filebeat/input/filestream/internal/input-logfile/store_test.go index 2d4f98b5d29b..ac77fc2c2942 100644 --- a/filebeat/input/filestream/internal/input-logfile/store_test.go +++ b/filebeat/input/filestream/internal/input-logfile/store_test.go @@ -508,7 +508,7 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore { func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts } func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod } -func (ts testStateStore) Access() (*statestore.Store, error) { +func (ts testStateStore) Access(string) (*statestore.Store, error) { if ts.Store == nil { return nil, errors.New("no store configured") } diff --git a/filebeat/input/journald/environment_test.go b/filebeat/input/journald/environment_test.go index 57f75163e926..9ea77d017d15 100644 --- a/filebeat/input/journald/environment_test.go +++ b/filebeat/input/journald/environment_test.go @@ -139,7 +139,7 @@ func (s *testInputStore) Close() { s.registry.Close() } -func (s *testInputStore) Access() (*statestore.Store, error) { +func (s *testInputStore) Access(_ string) (*statestore.Store, error) { return s.registry.Get("filebeat") } diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go index 220f71e2d9ba..9464016d40dd 100644 --- a/filebeat/input/journald/input_filtering_test.go +++ b/filebeat/input/journald/input_filtering_test.go @@ -256,7 +256,7 @@ func TestInputSeek(t *testing.T) { env := newInputTestingEnvironment(t) if testCase.cursor != "" { - store, _ := env.stateStore.Access() + store, _ := env.stateStore.Access("") tmp := map[string]any{} if err := json.Unmarshal([]byte(testCase.cursor), &tmp); err != nil { t.Fatal(err) diff --git a/filebeat/input/v2/input-cursor/manager.go b/filebeat/input/v2/input-cursor/manager.go index 1d5578a71223..f8d86054054e 100644 --- a/filebeat/input/v2/input-cursor/manager.go +++ b/filebeat/input/v2/input-cursor/manager.go @@ -21,11 +21,11 @@ import ( "context" "errors" "fmt" - "sync" "time" "github.com/elastic/go-concert/unison" + "github.com/elastic/beats/v7/filebeat/features" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" @@ -63,9 +63,9 @@ type InputManager struct { // that will be used to collect events from each source. Configure func(cfg *conf.C) ([]Source, Input, error) - initOnce sync.Once - initErr error - store *store + initedFull bool + initErr error + store *store } // Source describe a source the input can collect data from. @@ -82,25 +82,38 @@ var ( // StateStore interface and configurations used to give the Manager access to the persistent store. type StateStore interface { - Access() (*statestore.Store, error) + Access(typ string) (*statestore.Store, error) CleanupInterval() time.Duration } -func (cim *InputManager) init() error { - cim.initOnce.Do(func() { - if cim.DefaultCleanTimeout <= 0 { - cim.DefaultCleanTimeout = 30 * time.Minute - } +// init initializes the state store +// This function is called from: +// 1. InputManager::Init on beat start +// 2. InputManager::Create when the input is initialized with configuration +// When Elasticsearch state storage is used for the input it will be only fully configured on InputManager::Create, +// so skip reading the state from the storage on InputManager::Init in this case +func (cim *InputManager) init(inputID string) error { + if cim.initedFull { + return nil + } - log := cim.Logger.With("input_type", cim.Type) - var store *store - store, cim.initErr = openStore(log, cim.StateStore, cim.Type) - if cim.initErr != nil { - return - } + if cim.DefaultCleanTimeout <= 0 { + cim.DefaultCleanTimeout = 30 * time.Minute + } - cim.store = store - }) + log := cim.Logger.With("input_type", cim.Type) + var store *store + useES := features.IsElasticsearchStateStoreEnabledForInput(cim.Type) + fullInit := !useES || inputID != "" + store, cim.initErr = openStore(log, cim.StateStore, cim.Type, inputID, fullInit) + if cim.initErr != nil { + return cim.initErr + } + + cim.store = store + if fullInit { + cim.initedFull = true + } return cim.initErr } @@ -108,7 +121,7 @@ func (cim *InputManager) init() error { // Init starts background processes for deleting old entries from the // persistent store if mode is ModeRun. func (cim *InputManager) Init(group unison.Group) error { - if err := cim.init(); err != nil { + if err := cim.init(""); err != nil { return err } @@ -143,10 +156,6 @@ func (cim *InputManager) shutdown() { // Create builds a new v2.Input using the provided Configure function. // The Input will run a go-routine per source that has been configured. func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { - if err := cim.init(); err != nil { - return nil, err - } - settings := struct { ID string `config:"id"` CleanInactive time.Duration `config:"clean_inactive"` @@ -155,6 +164,10 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { return nil, err } + if err := cim.init(settings.ID); err != nil { + return nil, err + } + sources, inp, err := cim.Configure(config) if err != nil { return nil, err diff --git a/filebeat/input/v2/input-cursor/store.go b/filebeat/input/v2/input-cursor/store.go index a53bc77a79f9..936735946b0b 100644 --- a/filebeat/input/v2/input-cursor/store.go +++ b/filebeat/input/v2/input-cursor/store.go @@ -127,16 +127,18 @@ type ( // hook into store close for testing purposes var closeStore = (*store).close -func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) { +func openStore(log *logp.Logger, statestore StateStore, prefix string, inputID string, fullInit bool) (*store, error) { ok := false - persistentStore, err := statestore.Access() + log.Debugf("input-cursor::openStore: prefix: %v inputID: %s", prefix, inputID) + persistentStore, err := statestore.Access(prefix) if err != nil { return nil, err } defer cleanup.IfNot(&ok, func() { persistentStore.Close() }) + persistentStore.SetID(inputID) - states, err := readStates(log, persistentStore, prefix) + states, err := readStates(log, persistentStore, prefix, fullInit) if err != nil { return nil, err } @@ -283,41 +285,44 @@ func (r *resource) stateSnapshot() state { } } -func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*states, error) { +func readStates(log *logp.Logger, store *statestore.Store, prefix string, fullInit bool) (*states, error) { keyPrefix := prefix + "::" states := &states{ table: map[string]*resource{}, } - err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { - if !strings.HasPrefix(key, keyPrefix) { - return true, nil - } + if fullInit { + err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(key, keyPrefix) { + return true, nil + } + + var st state + if err := dec.Decode(&st); err != nil { + log.Errorf("Failed to read registry state for '%v', cursor state will be ignored. Error was: %+v", + key, err) + return true, nil + } + + resource := &resource{ + key: key, + stored: true, + lock: unison.MakeMutex(), + internalInSync: true, + internalState: stateInternal{ + TTL: st.TTL, + Updated: st.Updated, + }, + cursor: st.Cursor, + } + states.table[resource.key] = resource - var st state - if err := dec.Decode(&st); err != nil { - log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", - key, err) return true, nil + }) + log.Debugf("input-cursor store read %d keys", len(states.table)) + if err != nil { + return nil, err } - - resource := &resource{ - key: key, - stored: true, - lock: unison.MakeMutex(), - internalInSync: true, - internalState: stateInternal{ - TTL: st.TTL, - Updated: st.Updated, - }, - cursor: st.Cursor, - } - states.table[resource.key] = resource - - return true, nil - }) - if err != nil { - return nil, err } return states, nil } diff --git a/filebeat/input/v2/input-cursor/store_test.go b/filebeat/input/v2/input-cursor/store_test.go index fc1d57fac3ee..b7fbba9c8ad6 100644 --- a/filebeat/input/v2/input-cursor/store_test.go +++ b/filebeat/input/v2/input-cursor/store_test.go @@ -52,7 +52,7 @@ func TestStore_OpenClose(t *testing.T) { }) t.Run("fail if persistent store can not be accessed", func(t *testing.T) { - _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test") + _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test", "", true) require.Error(t, err) }) @@ -240,7 +240,7 @@ func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *sto persistentStore = createSampleStore(t, nil) } - store, err := openStore(logp.NewLogger("test"), persistentStore, prefix) + store, err := openStore(logp.NewLogger("test"), persistentStore, prefix, "", true) if err != nil { t.Fatalf("failed to open the store") } @@ -267,7 +267,7 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore { func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts } func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod } -func (ts testStateStore) Access() (*statestore.Store, error) { +func (ts testStateStore) Access(_ string) (*statestore.Store, error) { if ts.Store == nil { return nil, errors.New("no store configured") } diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 133a4fbdd695..8982c6d58609 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -55,7 +55,7 @@ type successLogger interface { } type StateStore interface { - Access() (*statestore.Store, error) + Access(typ string) (*statestore.Store, error) } var ( @@ -72,7 +72,7 @@ const fileStatePrefix = "filebeat::logs::" // New creates a new Registrar instance, updating the registry file on // `file.State` updates. New fails if the file can not be opened or created. func New(stateStore StateStore, out successLogger, flushTimeout time.Duration) (*Registrar, error) { - store, err := stateStore.Access() + store, err := stateStore.Access("") if err != nil { return nil, err } diff --git a/libbeat/statestore/backend/backend.go b/libbeat/statestore/backend/backend.go index c40d8515977d..c58ad173a3b1 100644 --- a/libbeat/statestore/backend/backend.go +++ b/libbeat/statestore/backend/backend.go @@ -42,7 +42,7 @@ type Store interface { Close() error // Has checks if the key exists. No error must be returned if the key does - // not exists, but the bool return must be false. + // not exist, but the bool return must be false. // An error return value must indicate internal errors only. The store is // assumed to be in a 'bad' but recoverable state if 'Has' fails. Has(key string) (bool, error) @@ -68,4 +68,8 @@ type Store interface { // is assumed to be invalidated once fn returns // The loop shall return if fn returns an error or false. Each(fn func(string, ValueDecoder) (bool, error)) error + + // SetID Sets the store ID when the full input configuration is acquired. + // This is needed in order to support Elasticsearch state store naming convention based on the input ID. + SetID(id string) } diff --git a/libbeat/statestore/backend/es/error.go b/libbeat/statestore/backend/es/error.go new file mode 100644 index 000000000000..df8b1a734d6f --- /dev/null +++ b/libbeat/statestore/backend/es/error.go @@ -0,0 +1,24 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import "errors" + +var ( + ErrKeyUnknown = errors.New("key unknown") +) diff --git a/libbeat/statestore/backend/es/notifier.go b/libbeat/statestore/backend/es/notifier.go new file mode 100644 index 000000000000..153883cf18f8 --- /dev/null +++ b/libbeat/statestore/backend/es/notifier.go @@ -0,0 +1,77 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import ( + "sync" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +type OnConfigUpdateFunc func(c *conf.C) +type UnsubscribeFunc func() + +type Notifier struct { + mx sync.Mutex + + lastConfig *conf.C + listeners map[int]OnConfigUpdateFunc + id int +} + +func NewNotifier() *Notifier { + return &Notifier{ + listeners: make(map[int]OnConfigUpdateFunc), + id: 0, + } +} + +// Subscribe adds a listener to the notifier. The listener will be called when Notify is called. +// Each OnConfigUpdateFunc is called asynchronously in a separate goroutine in each Notify call. +// +// Returns an UnsubscribeFunc that can be used to remove the listener. +// +// Note: Subscribe will call the listener with the last config that was passed to Notify. +func (n *Notifier) Subscribe(fn OnConfigUpdateFunc) UnsubscribeFunc { + n.mx.Lock() + defer n.mx.Unlock() + + id := n.id + n.id++ + n.listeners[id] = fn + + if n.lastConfig != nil { + go fn(n.lastConfig) + } + + return func() { + n.mx.Lock() + defer n.mx.Unlock() + delete(n.listeners, id) + } +} + +func (n *Notifier) Notify(c *conf.C) { + n.mx.Lock() + defer n.mx.Unlock() + n.lastConfig = c + + for _, listener := range n.listeners { + go listener(c) + } +} diff --git a/libbeat/statestore/backend/es/notifier_test.go b/libbeat/statestore/backend/es/notifier_test.go new file mode 100644 index 000000000000..290508411ab3 --- /dev/null +++ b/libbeat/statestore/backend/es/notifier_test.go @@ -0,0 +1,211 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +func createTestConfigs(t *testing.T, n int) []*conf.C { + var res []*conf.C + for i := 0; i < n; i++ { + c, err := conf.NewConfigFrom(map[string]any{ + "id": i, + }) + require.NoError(t, err) + require.NotNil(t, c) + id, err := c.Int("id", -1) + require.NoError(t, err, "sanity check: id is stored") + require.Equal(t, int64(i), id, "sanity check: id is correct") + res = append(res, c) + } + return res +} + +func wgWait(t *testing.T, wg *sync.WaitGroup) { + const timeout = 1 * time.Second + t.Helper() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + return + case <-time.After(timeout): + require.Fail(t, "timeout waiting for WaitGroup") + } +} + +func TestSanity(t *testing.T) { + assert.Equal(t, createTestConfigs(t, 5), createTestConfigs(t, 5)) + assert.NotEqual(t, createTestConfigs(t, 4), createTestConfigs(t, 5)) + assert.NotEqual(t, createTestConfigs(t, 5)[3], createTestConfigs(t, 5)[4]) +} + +func TestSubscribeAndNotify(t *testing.T) { + notifier := NewNotifier() + + var ( + wg sync.WaitGroup + mx sync.Mutex + receivedFirst []*conf.C + receivedSecond []*conf.C + ) + + unsubFirst := notifier.Subscribe(func(c *conf.C) { + defer wg.Done() + mx.Lock() + defer mx.Unlock() + receivedFirst = append(receivedFirst, c) + }) + defer unsubFirst() + + unsubSecond := notifier.Subscribe(func(c *conf.C) { + defer wg.Done() + mx.Lock() + defer mx.Unlock() + receivedSecond = append(receivedSecond, c) + }) + defer unsubSecond() + + const totalNotifications = 3 + + configs := createTestConfigs(t, totalNotifications) + + wg.Add(totalNotifications * 2) + for _, config := range configs { + notifier.Notify(config) + } + + wgWait(t, &wg) + assert.ElementsMatch(t, configs, receivedFirst) + assert.ElementsMatch(t, configs, receivedSecond) + + // Receive old config + wg.Add(1) + notifier.Subscribe(func(c *conf.C) { + defer wg.Done() + mx.Lock() + defer mx.Unlock() + }) + wgWait(t, &wg) +} + +func TestUnsubscribe(t *testing.T) { + var ( + wg sync.WaitGroup + mx sync.Mutex + receivedFirst, receivedSecond []*conf.C + ) + + notifier := NewNotifier() + + unsubFirst := notifier.Subscribe(func(c *conf.C) { + defer wg.Done() + mx.Lock() + defer mx.Unlock() + receivedFirst = append(receivedFirst, c) + }) + defer unsubFirst() + + unsubSecond := notifier.Subscribe(func(c *conf.C) { + defer wg.Done() + mx.Lock() + defer mx.Unlock() + receivedSecond = append(receivedSecond, c) + }) + defer unsubSecond() + + const totalNotifications = 3 + + configs := createTestConfigs(t, totalNotifications) + + // Unsubscribe first + unsubFirst() + + // Notify + wg.Add(totalNotifications) + for _, config := range configs { + notifier.Notify(config) + } + + wgWait(t, &wg) + assert.Empty(t, receivedFirst) + assert.ElementsMatch(t, configs, receivedSecond) +} + +func TestConcurrentSubscribeAndNotify(t *testing.T) { + notifier := NewNotifier() + + var ( + wg, wgSub sync.WaitGroup + mx, mxSub sync.Mutex + received []*conf.C + unsubFns []UnsubscribeFunc + ) + + // Concurrent subscribers + const count = 10 + wgSub.Add(count) + wg.Add(count) + for i := 0; i < count; i++ { + go func() { + defer wgSub.Done() + mxSub.Lock() + defer mxSub.Unlock() + unsub := notifier.Subscribe(func(c *conf.C) { + defer wg.Done() + mx.Lock() + defer mx.Unlock() + received = append(received, c) + }) + unsubFns = append(unsubFns, unsub) + }() + } + defer func() { + for _, unsubFn := range unsubFns { + unsubFn() + } + }() + + // Wait for all subscribers to be added + wgWait(t, &wgSub) + + // Notify + c := createTestConfigs(t, 1)[0] + notifier.Notify(c) + + // Wait for all + wgWait(t, &wg) + expected := make([]*conf.C, count) + for i := 0; i < count; i++ { + expected[i] = c + } + assert.Equal(t, expected, received) +} diff --git a/libbeat/statestore/backend/es/registry.go b/libbeat/statestore/backend/es/registry.go new file mode 100644 index 000000000000..42ef973a2bbf --- /dev/null +++ b/libbeat/statestore/backend/es/registry.go @@ -0,0 +1,53 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/elastic-agent-libs/logp" +) + +type Registry struct { + ctx context.Context + + log *logp.Logger + mx sync.Mutex + + notifier *Notifier +} + +func New(ctx context.Context, log *logp.Logger, notifier *Notifier) *Registry { + return &Registry{ + ctx: ctx, + log: log, + notifier: notifier, + } +} + +func (r *Registry) Access(name string) (backend.Store, error) { + r.mx.Lock() + defer r.mx.Unlock() + return openStore(r.ctx, r.log, name, r.notifier) +} + +func (r *Registry) Close() error { + return nil +} diff --git a/libbeat/statestore/backend/es/store.go b/libbeat/statestore/backend/es/store.go new file mode 100644 index 000000000000..fee1e0c9ba48 --- /dev/null +++ b/libbeat/statestore/backend/es/store.go @@ -0,0 +1,340 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "sync" + "time" + + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/statestore/backend" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +// The current typical usage of the state storage is such that the consumer +// of the storage fetches all the keys and caches them at the start of the beat. +// Then the key value gets updated (Set is called) possibly frequently, so we want these operations to happen fairly fast +// and not block waiting on Elasticsearch refresh, thus the slight trade-off for performance instead of consistency. +// The value is not normally retrieved after a modification, so the inconsistency (potential refresh delay) is acceptable for our use cases. +// +// If consistency becomes a strict requirement, the storage would need to implement possibly some caching mechanism +// that would guarantee the consistency between Set/Remove/Get/Each operations at least for a given "in-memory" instance of the storage. + +type store struct { + ctx context.Context + cn context.CancelFunc + log *logp.Logger + name string + index string + notifier *Notifier + + chReady chan struct{} + once sync.Once + + mx sync.Mutex + cli *eslegclient.Connection + cliErr error +} + +const docType = "_doc" + +func openStore(ctx context.Context, log *logp.Logger, name string, notifier *Notifier) (*store, error) { + ctx, cn := context.WithCancel(ctx) + s := &store{ + ctx: ctx, + cn: cn, + log: log.With("name", name).With("backend", "elasticsearch"), + name: name, + index: renderIndexName(name), + notifier: notifier, + chReady: make(chan struct{}), + } + + chCfg := make(chan *conf.C) + + unsubFn := s.notifier.Subscribe(func(c *conf.C) { + select { + case chCfg <- c: + case <-ctx.Done(): + } + }) + + go s.loop(ctx, cn, unsubFn, chCfg) + + return s, nil +} + +func renderIndexName(name string) string { + return "agentless-state-" + name +} + +func (s *store) waitReady() error { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case <-s.chReady: + return s.cliErr + } +} + +func (s *store) SetID(id string) { + s.mx.Lock() + defer s.mx.Unlock() + + if id == "" { + return + } + s.index = renderIndexName(id) +} + +func (s *store) Close() error { + s.mx.Lock() + defer s.mx.Unlock() + + if s.cn != nil { + s.cn() + } + + if s.cli != nil { + err := s.cli.Close() + s.cli = nil + return err + } + return nil +} + +func (s *store) Has(key string) (bool, error) { + if err := s.waitReady(); err != nil { + return false, err + } + s.mx.Lock() + defer s.mx.Unlock() + + var v interface{} + err := s.get(key, v) + if err != nil { + if errors.Is(err, ErrKeyUnknown) { + return false, nil + } + return false, err + } + return true, nil +} + +func (s *store) Get(key string, to interface{}) error { + if err := s.waitReady(); err != nil { + return err + } + s.mx.Lock() + defer s.mx.Unlock() + + return s.get(key, to) +} + +func (s *store) get(key string, to interface{}) error { + status, data, err := s.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", s.index, docType, url.QueryEscape(key)), "", nil, nil) + + if err != nil { + if status == http.StatusNotFound { + return ErrKeyUnknown + } + return err + } + + var qr queryResult + err = json.Unmarshal(data, &qr) + if err != nil { + return err + } + + err = json.Unmarshal(qr.Source.Value, to) + if err != nil { + return err + } + return nil +} + +type queryResult struct { + Found bool `json:"found"` + Source struct { + Value json.RawMessage `json:"v"` + } `json:"_source"` +} + +type doc struct { + Value any `struct:"v"` + UpdatedAt any `struct:"updated_at"` +} + +type entry struct { + value interface{} +} + +func (e entry) Decode(to interface{}) error { + return typeconv.Convert(to, e.value) +} + +func renderRequest(val interface{}) doc { + return doc{ + Value: val, + UpdatedAt: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), + } +} + +func (s *store) Set(key string, value interface{}) error { + if err := s.waitReady(); err != nil { + return err + } + s.mx.Lock() + defer s.mx.Unlock() + + doc := renderRequest(value) + _, _, err := s.cli.Request("PUT", fmt.Sprintf("/%s/%s/%s", s.index, docType, url.QueryEscape(key)), "", nil, doc) + if err != nil { + return err + } + return nil +} + +func (s *store) Remove(key string) error { + if err := s.waitReady(); err != nil { + return err + } + s.mx.Lock() + defer s.mx.Unlock() + + _, _, err := s.cli.Delete(s.index, docType, url.QueryEscape(key), nil) + if err != nil { + return err + } + return nil +} + +type searchResult struct { + ID string `json:"_id"` + Source struct { + Value json.RawMessage `json:"v"` + } `json:"_source"` +} + +func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { + if err := s.waitReady(); err != nil { + return err + } + + s.mx.Lock() + defer s.mx.Unlock() + + // Do nothing for now if the store was not initialized + if s.cli == nil { + return nil + } + + status, result, err := s.cli.SearchURIWithBody(s.index, "", nil, map[string]any{ + "query": map[string]any{ + "match_all": map[string]any{}, + }, + "size": 1000, // TODO: we might have to do scroll if there are more than 1000 keys + }) + + if err != nil && status != http.StatusNotFound { + return err + } + + if result == nil || len(result.Hits.Hits) == 0 { + return nil + } + + for _, hit := range result.Hits.Hits { + var sres searchResult + err = json.Unmarshal(hit, &sres) + if err != nil { + return err + } + + var e entry + err = json.Unmarshal(sres.Source.Value, &e.value) + if err != nil { + return err + } + + key, err := url.QueryUnescape(sres.ID) + if err != nil { + return err + } + + cont, err := fn(key, e) + if !cont || err != nil { + return err + } + } + + return nil +} + +func (s *store) configure(ctx context.Context, c *conf.C) { + s.log.Debugf("Configure ES store") + s.mx.Lock() + defer s.mx.Unlock() + + if s.cli != nil { + _ = s.cli.Close() + s.cli = nil + } + s.cliErr = nil + + cli, err := eslegclient.NewConnectedClient(ctx, c, s.name) + if err != nil { + s.log.Errorf("ES store, failed to create elasticsearch client: %v", err) + s.cliErr = err + } else { + s.cli = cli + } + + // Signal store is ready + s.once.Do(func() { + close(s.chReady) + }) + +} + +func (s *store) loop(ctx context.Context, cn context.CancelFunc, unsubFn UnsubscribeFunc, chCfg chan *conf.C) { + defer cn() + + // Unsubscribe on exit + defer unsubFn() + + defer s.log.Debug("ES store exit main loop") + + for { + select { + case <-ctx.Done(): + return + case cu := <-chCfg: + s.configure(ctx, cu) + } + } +} diff --git a/libbeat/statestore/backend/memlog/store.go b/libbeat/statestore/backend/memlog/store.go index 5bd6aac77fdf..67b94862262c 100644 --- a/libbeat/statestore/backend/memlog/store.go +++ b/libbeat/statestore/backend/memlog/store.go @@ -276,6 +276,10 @@ func (m *memstore) Remove(key string) bool { return true } +func (s *store) SetID(_ string) { + // NOOP +} + func (e entry) Decode(to interface{}) error { return typeconv.Convert(to, e.value) } diff --git a/libbeat/statestore/mock_test.go b/libbeat/statestore/mock_test.go index 165243bcd02d..9cc220df3ddd 100644 --- a/libbeat/statestore/mock_test.go +++ b/libbeat/statestore/mock_test.go @@ -93,3 +93,6 @@ func (m *mockStore) Each(fn func(string, backend.ValueDecoder) (bool, error)) er args := m.Called(fn) return args.Error(0) } + +func (m *mockStore) SetID(_ string) { +} diff --git a/libbeat/statestore/store.go b/libbeat/statestore/store.go index c204fcde8f51..875ba43e870c 100644 --- a/libbeat/statestore/store.go +++ b/libbeat/statestore/store.go @@ -61,6 +61,10 @@ func newStore(shared *sharedStore) *Store { } } +func (s *Store) SetID(id string) { + s.shared.backend.SetID(id) +} + // Close deactivates the current store. No new transacation can be generated. // Already active transaction will continue to function until Closed. // The backing store will be closed once all stores and active transactions have been closed. diff --git a/libbeat/statestore/storetest/storetest.go b/libbeat/statestore/storetest/storetest.go index 69f065e9bdc8..d43ce9bd1e42 100644 --- a/libbeat/statestore/storetest/storetest.go +++ b/libbeat/statestore/storetest/storetest.go @@ -213,3 +213,7 @@ func (s *MapStore) Each(fn func(string, backend.ValueDecoder) (bool, error)) err func (d valueUnpacker) Decode(to interface{}) error { return typeconv.Convert(to, d.from) } + +func (s *MapStore) SetID(_ string) { + // NOOP +} diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index 2bfb9f29cd8b..fe5d36016eb8 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -35,7 +35,7 @@ type states struct { // newStates generates a new states registry. func newStates(log *logp.Logger, stateStore beater.StateStore, listPrefix string) (*states, error) { - store, err := stateStore.Access() + store, err := stateStore.Access("") if err != nil { return nil, fmt.Errorf("can't access persistent store: %w", err) } diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go index fa604ed08d96..d15590493ee5 100644 --- a/x-pack/filebeat/input/awss3/states_test.go +++ b/x-pack/filebeat/input/awss3/states_test.go @@ -32,7 +32,7 @@ func (s *testInputStore) Close() { _ = s.registry.Close() } -func (s *testInputStore) Access() (*statestore.Store, error) { +func (s *testInputStore) Access(_ string) (*statestore.Store, error) { return s.registry.Get("filebeat") } diff --git a/x-pack/filebeat/input/salesforce/input_manager_test.go b/x-pack/filebeat/input/salesforce/input_manager_test.go index 8b73763f93fa..fc69f9180401 100644 --- a/x-pack/filebeat/input/salesforce/input_manager_test.go +++ b/x-pack/filebeat/input/salesforce/input_manager_test.go @@ -34,7 +34,7 @@ func makeTestStore(data map[string]interface{}) *statestore.Store { type stateStore struct{} -func (stateStore) Access() (*statestore.Store, error) { +func (stateStore) Access(_ string) (*statestore.Store, error) { return makeTestStore(map[string]interface{}{"hello": "world"}), nil } func (stateStore) CleanupInterval() time.Duration { return time.Duration(0) } diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index dc935d719507..84f6ad0f54bb 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -14,6 +14,9 @@ import ( "fmt" "io" "math" + "net/http" + "net/http/httptest" + "net/url" "os" "path/filepath" "strings" @@ -22,6 +25,7 @@ import ( "testing" "time" + "github.com/gofrs/uuid/v5" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -32,6 +36,7 @@ import ( "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/beats/v7/testing/certutil" "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" "github.com/elastic/elastic-agent-client/v7/pkg/proto" ) @@ -234,50 +239,15 @@ func TestInputReloadUnderElasticAgent(t *testing.T) { "-E", "management.enabled=true", ) - // waitDeadlineOr5Mins looks at the test deadline - // and returns a reasonable value of waiting for a - // condition to be met. The possible values are: - // - if no test deadline is set, return 5 minutes - // - if a deadline is set and there is less than - // 0.5 second left, return the time left - // - otherwise return the time left minus 0.5 second. - waitDeadlineOr5Min := func() time.Duration { - deadline, deadlineSet := t.Deadline() - if deadlineSet { - left := time.Until(deadline) - final := left - 500*time.Millisecond - if final <= 0 { - return left - } - return final - } - return 5 * time.Minute + for _, contains := range []string{ + "Can only start an input when all related states are finished", + "file 'flog.log' is not finished, will retry starting the input soon", + "ForceReload set to TRUE", + "Reloading Beats inputs because forceReload is true", + "ForceReload set to FALSE", + } { + checkFilebeatLogs(t, filebeat, contains) } - - require.Eventually(t, func() bool { - return filebeat.LogContains("Can only start an input when all related states are finished") - }, waitDeadlineOr5Min(), 100*time.Millisecond, - "String 'Can only start an input when all related states are finished' not found on Filebeat logs") - - require.Eventually(t, func() bool { - return filebeat.LogContains("file 'flog.log' is not finished, will retry starting the input soon") - }, waitDeadlineOr5Min(), 100*time.Millisecond, - "String 'file 'flog.log' is not finished, will retry starting the input soon' not found on Filebeat logs") - - require.Eventually(t, func() bool { - return filebeat.LogContains("ForceReload set to TRUE") - }, waitDeadlineOr5Min(), 100*time.Millisecond, - "String 'ForceReload set to TRUE' not found on Filebeat logs") - - require.Eventually(t, func() bool { - return filebeat.LogContains("Reloading Beats inputs because forceReload is true") - }, waitDeadlineOr5Min(), 100*time.Millisecond, - "String 'Reloading Beats inputs because forceReload is true' not found on Filebeat logs") - - require.Eventually(t, func() bool { - return filebeat.LogContains("ForceReload set to FALSE") - }, waitDeadlineOr5Min(), 100*time.Millisecond, - "String 'ForceReload set to FALSE' not found on Filebeat logs") } // TestFailedOutputReportsUnhealthy ensures that if an output @@ -832,3 +802,273 @@ func writeStartUpInfo(t *testing.T, w io.Writer, info *proto.StartUpInfo) { _, err = w.Write(infoBytes) require.NoError(t, err, "failed to write connection information") } + +// Response structure for JSON +type response struct { + Message string `json:"message"` + Published string `json:"published"` +} + +func TestHTTPJSONInputReloadUnderElasticAgentWithElasticStateStore(t *testing.T) { + // First things first, ensure ES is running and we can connect to it. + // If ES is not running, the test will timeout and the only way to know + // what caused it is going through Filebeat's logs. + integration.EnsureESIsRunning(t) + + // Create a test httpjson server for httpjson input + h := serverHelper{t: t} + defer func() { + assert.GreaterOrEqual(t, h.called, 2, "HTTP server should be called at least twice") + }() + testServer := httptest.NewServer(http.HandlerFunc(h.handler)) + defer testServer.Close() + + inputID := "httpjson-generic-" + uuid.Must(uuid.NewV4()).String() + inputUnit := &proto.UnitExpected{ + Id: inputID, + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: inputID, + Source: tests.RequireNewStruct(map[string]any{ + "id": inputID, + "type": "httpjson", + "name": "httpjson-1", + "enabled": true, + }), + Type: "httpjson", + Name: "httpjson-1", + Streams: []*proto.Stream{ + { + Id: inputID, + Source: integration.RequireNewStruct(t, map[string]any{ + "id": inputID, + "enabled": true, + "type": "httpjson", + "interval": "5s", + "request.url": testServer.URL, + "request.method": "GET", + "request.transforms": []any{ + map[string]any{ + "set": map[string]any{ + "target": "url.params.since", + "value": "[[.cursor.published]]", + "default": `[[formatDate (now (parseDuration "-24h")) "RFC3339"]]`, + }, + }, + }, + "cursor": map[string]any{ + "published": map[string]any{ + "value": "[[.last_event.published]]", + }, + }, + }), + }, + }, + }, + } + units := [][]*proto.UnitExpected{ + {outputUnitES(t, 1), inputUnit}, + {outputUnitES(t, 2), inputUnit}, + } + + idx := 0 + waiting := false + when := time.Now() + + final := atomic.Bool{} + nextState := func() { + if waiting { + if time.Now().After(when) { + t.Log("Next state") + idx = (idx + 1) % len(units) + waiting = false + h.notifyChange() + return + } + return + } + waiting = true + when = time.Now().Add(10 * time.Second) + } + + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + if management.DoesStateMatch(observed, units[idx], 0) { + if idx < len(units)-1 { + nextState() + } else { + final.Store(true) + } + } + for _, unit := range observed.GetUnits() { + expected := []proto.State{proto.State_HEALTHY, proto.State_CONFIGURING, proto.State_STARTING} + if !waiting { + expected = append(expected, proto.State_STOPPING) + } + require.Containsf(t, expected, unit.GetState(), "Unit '%s' is not healthy, state: %s", unit.GetId(), unit.GetState().String()) + } + return &proto.CheckinExpected{ + Units: units[idx], + } + }, + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + } + + require.NoError(t, server.Start()) + t.Cleanup(server.Stop) + + t.Setenv("AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES", "httpjson,cel") + filebeat := NewFilebeat(t) + filebeat.RestartOnBeatOnExit = true + filebeat.Start( + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + "-E", "management.restart_on_output_change=true", + ) + + for _, contains := range []string{ + "Configure ES store", + "input-cursor::openStore: prefix: httpjson inputID: " + inputID, + "input-cursor store read 0 keys", // first, no previous data exists + "input-cursor store read 1 keys", // after the restart, previous key is read + } { + checkFilebeatLogs(t, filebeat, contains) + } + + require.Eventually(t, + final.Load, + waitDeadlineOr5Min(t), + 100*time.Millisecond, + "Failed to reach the final state", + ) +} + +type serverHelper struct { + t *testing.T + lock sync.Mutex + previous time.Time + called int + stateChanged bool +} + +func (h *serverHelper) verifyTime(since time.Time) time.Time { + h.lock.Lock() + defer h.lock.Unlock() + + h.called++ + + if h.previous.IsZero() { + assert.WithinDurationf(h.t, time.Now().Add(-24*time.Hour), since, 15*time.Minute, "since should be ~24h ago") + } else { + // XXX: `since` field is expected to be equal to the last published time. However, between unit restarts, the last + // updated field might not be persisted successfully. As a workaround, we allow a larger delta between restarts. + // However, we are still checking that the `since` field is not too far in the past, like 24h ago which is the + // initial value. + assert.WithinDurationf(h.t, h.previous, since, h.getDelta(since), "since should re-use last value") + } + h.previous = time.Now() + return h.previous +} + +func (h *serverHelper) getDelta(actual time.Time) time.Duration { + const delta = 1 * time.Second + if !h.stateChanged { + return delta + } + + dt := h.previous.Sub(actual) + if dt < -delta || dt > delta { + h.stateChanged = false + return time.Minute + } + return delta +} + +func (h *serverHelper) handler(w http.ResponseWriter, r *http.Request) { + since := parseParams(h.t, r.RequestURI) + published := h.verifyTime(since) + + w.Header().Set("Content-Type", "application/json") + err := json.NewEncoder(w).Encode(response{ + Message: "Hello", + Published: published.Format(time.RFC3339), + }) + require.NoError(h.t, err) +} + +func (h *serverHelper) notifyChange() { + h.lock.Lock() + defer h.lock.Unlock() + h.stateChanged = true +} + +func parseParams(t *testing.T, uri string) time.Time { + myUrl, err := url.Parse(uri) + require.NoError(t, err) + params, err := url.ParseQuery(myUrl.RawQuery) + require.NoError(t, err) + since := params["since"] + require.NotEmpty(t, since) + sinceStr := since[0] + sinceTime, err := time.Parse(time.RFC3339, sinceStr) + require.NoError(t, err) + return sinceTime +} + +func checkFilebeatLogs(t *testing.T, filebeat *integration.BeatProc, contains string) { + t.Helper() + const tick = 100 * time.Millisecond + + require.Eventually(t, + func() bool { return filebeat.LogContains(contains) }, + waitDeadlineOr5Min(t), + tick, + fmt.Sprintf("String '%s' not found on Filebeat logs", contains), + ) +} + +// waitDeadlineOr5Min looks at the test deadline and returns a reasonable value of waiting for a condition to be met. +// The possible values are: +// - if no test deadline is set, return 5 minutes +// - if a deadline is set and there is less than 0.5 second left, return the time left +// - otherwise return the time left minus 0.5 second. +func waitDeadlineOr5Min(t *testing.T) time.Duration { + deadline, deadlineSet := t.Deadline() + if !deadlineSet { + return 5 * time.Minute + } + left := time.Until(deadline) + final := left - 500*time.Millisecond + if final <= 0 { + return left + } + return final +} + +func outputUnitES(t *testing.T, id int) *proto.UnitExpected { + return &proto.UnitExpected{ + Id: fmt.Sprintf("output-unit-%d", id), + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: fmt.Sprintf("elasticsearch-%d", id), + Source: integration.RequireNewStruct(t, + map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"http://localhost:9200"}, + "username": "admin", + "password": "testing", + "protocol": "http", + "enabled": true, + "allow_older_versions": true, + }), + }, + } +}