Skip to content

Commit

Permalink
[8.x](backport #41446) [filebeat] Elasticsearch state storage for htt…
Browse files Browse the repository at this point in the history
…pjson and cel inputs (#42451)

This enables Elasticsearch as State Store Backend for Security Integrations for
the Agentless solution.

The scope of this change was narrowed down to supporting only `httpjson` inputs
in order to support Okta integration for the initial release. All the other
integrations inputs still use the file storage as before.
This is a short term solution for the state storage for k8s.

The feature currently can only be enabled with the
`AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES` env var.

The existing code relied on the inputs state storage to be fully configurable
before the main beat managers runs. The change delays the configuration of
`httpjson` input to the time when the actual configuration is received from the
Agent.

Example of the state storage index content for Okta integration:
```
{
  "took": 6,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "agentless-state-httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959",
        "_id": "httpjson::httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959::https://dev-36006609.okta.com/api/v1/logs",
        "_seq_no": 39,
        "_primary_term": 1,
        "_score": 1,
        "_source": {
          "v": {
            "ttl": 1800000000000,
            "updated": "2024-10-24T20:21:22.032Z",
            "cursor": {
              "published": "2024-10-24T20:19:53.542Z"
            }
          }
        }
      }
    ]
  }
}
```

The naming convention for all state store is `agentless-state-<input id>`,
since the expectation for agentless we would have only one agent per policy and
the agents are ephemeral.

Closes https://github.com/elastic/security-team/issues/11101

(cherry picked from commit 8180f23)

Co-authored-by: Aleksandr Maus <[email protected]>
Co-authored-by: Orestis Floros <[email protected]>
  • Loading branch information
3 people authored Jan 29, 2025
1 parent dfcbbfc commit 911f527
Show file tree
Hide file tree
Showing 29 changed files with 1,313 additions and 124 deletions.
34 changes: 31 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
Expand Down Expand Up @@ -80,7 +81,9 @@ type Filebeat struct {
type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin

type StateStore interface {
Access() (*statestore.Store, error)
// Access returns the storage registry depending on the type. This is needed for the Elasticsearch state store which
// is guarded by the feature.IsElasticsearchStateStoreEnabledForInput(typ) check.
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

Expand Down Expand Up @@ -299,13 +302,36 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry)
// Use context, like normal people do, hooking up to the beat.done channel
ctx, cn := context.WithCancel(context.Background())
go func() {
<-fb.done
cn()
}()

stateStore, err := openStateStore(ctx, b.Info, logp.NewLogger("filebeat"), config.Registry)
if err != nil {
logp.Err("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()

// If notifier is set, configure the listener for output configuration
// The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage
// in order to allow it fully configure
if stateStore.notifier != nil {
b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
outCfg := conf.Namespace{}
if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
logp.Err("Failed to unpack the output config: %v", err)
return nil
}

stateStore.notifier.Notify(outCfg.Config())
return nil
})
}

err = processLogInputTakeOver(stateStore, config)
if err != nil {
logp.Err("Failed to attempt filestream state take over: %+v", err)
Expand Down Expand Up @@ -351,6 +377,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
defer func() {
_ = inputTaskGroup.Stop()
}()

// Store needs to be fully configured at this point
if err := v2InputLoader.Init(&inputTaskGroup); err != nil {
logp.Err("Failed to initialize the input managers: %v", err)
return err
Expand Down Expand Up @@ -535,7 +563,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
return nil
}

store, err := stateStore.Access()
store, err := stateStore.Access("")
if err != nil {
return fmt.Errorf("Failed to access state when attempting take over: %w", err)
}
Expand Down
46 changes: 40 additions & 6 deletions filebeat/beater/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,77 @@
package beater

import (
"context"
"time"

"github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/features"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/backend"
"github.com/elastic/beats/v7/libbeat/statestore/backend/es"
"github.com/elastic/beats/v7/libbeat/statestore/backend/memlog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

type filebeatStore struct {
registry *statestore.Registry
esRegistry *statestore.Registry
storeName string
cleanInterval time.Duration

// Notifies the Elasticsearch store about configuration change
// which is available only after the beat runtime manager connects to the Agent
// and receives the output configuration
notifier *es.Notifier
}

func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
memlog, err := memlog.New(logger, memlog.Settings{
func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
var (
reg backend.Registry
err error

esreg *es.Registry
notifier *es.Notifier
)

if features.IsElasticsearchStateStoreEnabled() {
notifier = es.NewNotifier()
esreg = es.New(ctx, logger, notifier)
}

reg, err = memlog.New(logger, memlog.Settings{
Root: paths.Resolve(paths.Data, cfg.Path),
FileMode: cfg.Permissions,
})
if err != nil {
return nil, err
}

return &filebeatStore{
registry: statestore.NewRegistry(memlog),
store := &filebeatStore{
registry: statestore.NewRegistry(reg),
storeName: info.Beat,
cleanInterval: cfg.CleanInterval,
}, nil
notifier: notifier,
}

if esreg != nil {
store.esRegistry = statestore.NewRegistry(esreg)
}

return store, nil
}

func (s *filebeatStore) Close() {
s.registry.Close()
}

func (s *filebeatStore) Access() (*statestore.Store, error) {
// Access returns the storage registry depending on the type. Default is the file store.
func (s *filebeatStore) Access(typ string) (*statestore.Store, error) {
if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil {
return s.esRegistry.Get(s.storeName)
}
return s.registry.Get(s.storeName)
}

Expand Down
59 changes: 59 additions & 0 deletions filebeat/features/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package features

import (
"os"
"strings"
)

// List of input types Elasticsearch state store is enabled for
var esTypesEnabled map[string]struct{}

var isESEnabled bool

func init() {
initFromEnv("AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES")
}

func initFromEnv(envName string) {
esTypesEnabled = make(map[string]struct{})

arr := strings.Split(os.Getenv(envName), ",")
for _, e := range arr {
k := strings.TrimSpace(e)
if k != "" {
esTypesEnabled[k] = struct{}{}
}
}
isESEnabled = len(esTypesEnabled) > 0
}

// IsElasticsearchStateStoreEnabled returns true if feature is enabled for agentless
func IsElasticsearchStateStoreEnabled() bool {
return isESEnabled
}

// IsElasticsearchStateStoreEnabledForInput returns true if the provided input type uses Elasticsearch for state storage if the Elasticsearch state store feature is enabled
func IsElasticsearchStateStoreEnabledForInput(inputType string) bool {
if IsElasticsearchStateStoreEnabled() {
_, ok := esTypesEnabled[inputType]
return ok
}
return false
}
86 changes: 86 additions & 0 deletions filebeat/features/features_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package features

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_initFromEnv(t *testing.T) {
const envName = "TEST_AGENTLESS_ENV"

t.Run("Without setting env", func(t *testing.T) {
// default init
assert.False(t, IsElasticsearchStateStoreEnabled())
assert.Empty(t, esTypesEnabled)
assert.False(t, IsElasticsearchStateStoreEnabledForInput("xxx"))

// init from env
initFromEnv(envName)
assert.False(t, IsElasticsearchStateStoreEnabled())
assert.Empty(t, esTypesEnabled)
assert.False(t, IsElasticsearchStateStoreEnabledForInput("xxx"))
})

tests := []struct {
name string
value string
wantEnabled bool
wantContains []string
}{
{
name: "Empty",
value: "",
wantEnabled: false,
wantContains: nil,
},
{
name: "Single value",
value: "xxx",
wantEnabled: true,
wantContains: []string{"xxx"},
},
{
name: "Multiple values",
value: "xxx,yyy",
wantEnabled: true,
wantContains: []string{"xxx", "yyy"},
},
{
name: "Multiple values with spaces",
value: ",,, , xxx , yyy, ,,,,",
wantEnabled: true,
wantContains: []string{"xxx", "yyy"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Setenv(envName, tt.value)
initFromEnv(envName)

assert.Equal(t, tt.wantEnabled, IsElasticsearchStateStoreEnabled())
for _, contain := range tt.wantContains {
assert.Contains(t, esTypesEnabled, contain)
assert.True(t, IsElasticsearchStateStoreEnabledForInput(contain))
}
assert.Len(t, esTypesEnabled, len(tt.wantContains))
})
}
}
8 changes: 4 additions & 4 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (e *inputTestingEnvironment) abspath(filename string) string {
}

func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")

actual := 0
err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) {
Expand Down Expand Up @@ -331,7 +331,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")
id := getIDFromPath(filepath, inputID, fi)

var entry registryEntry
Expand All @@ -352,7 +352,7 @@ func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expect
}

func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) {
inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")

var entry registryEntry
err := inputStore.Get(key, &entry)
Expand Down Expand Up @@ -553,7 +553,7 @@ func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *testStore) Close() {
s.registry.Close()
}

func (s *testStore) Access() (*statestore.Store, error) {
func (s *testStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filestream-benchmark")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ const globalInputID = ".global"

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access() (*statestore.Store, error)
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/internal/input-logfile/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ var closeStore = (*store).close
func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) {
ok := false

persistentStore, err := statestore.Access()
persistentStore, err := statestore.Access("")
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore {

func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts }
func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod }
func (ts testStateStore) Access() (*statestore.Store, error) {
func (ts testStateStore) Access(string) (*statestore.Store, error) {
if ts.Store == nil {
return nil, errors.New("no store configured")
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/journald/input_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestInputSeek(t *testing.T) {
env := newInputTestingEnvironment(t)

if testCase.cursor != "" {
store, _ := env.stateStore.Access()
store, _ := env.stateStore.Access("")
tmp := map[string]any{}
if err := json.Unmarshal([]byte(testCase.cursor), &tmp); err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 911f527

Please sign in to comment.