From 9a0e8b17841760af27c6f7a2da1a5d528d7f581b Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Tue, 7 Nov 2023 16:10:04 +0100 Subject: [PATCH] invalidate api key if remote output is removed --- internal/pkg/api/handleAck.go | 9 +- internal/pkg/bulk/engine.go | 88 ++++++++++++++++- internal/pkg/model/schema.go | 2 + internal/pkg/policy/policy_output.go | 138 +++++++++++---------------- internal/pkg/policy/self.go | 1 + internal/pkg/testing/bulk.go | 10 +- 6 files changed, 161 insertions(+), 87 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index b4c15b822..c08094b2e 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -556,16 +556,23 @@ func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) { func (ack *AckT) invalidateAPIKeys(ctx context.Context, zlog zerolog.Logger, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, skip string) { ids := make([]string, 0, len(toRetireAPIKeyIDs)) + output := "" for _, k := range toRetireAPIKeyIDs { if k.ID == skip || k.ID == "" { continue } ids = append(ids, k.ID) + output = k.Output + } + // using remote es bulker to invalidate api key - supposing all retire api keky ids belong to the same remote es + bulk := ack.bulk + if output != "" { + bulk = ack.bulk.GetBulker(output) } if len(ids) > 0 { zlog.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys") - if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { + if err := bulk.APIKeyInvalidate(ctx, ids...); err != nil { zlog.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") } } diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 9fb870138..38fdfc0db 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -15,6 +15,8 @@ import ( "time" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" + "github.com/elastic/fleet-server/v7/internal/pkg/build" + "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/go-elasticsearch/v8" @@ -66,13 +68,13 @@ type Bulk interface { // Accessor used to talk to elastic search direcly bypassing bulk engine Client() *elasticsearch.Client - // Reusing tracer to create bulker for remote ES outputs - Tracer() *apm.Tracer - CheckRemoteOutputChanged(name string, newCfg map[string]interface{}) RemoteOutputCh() chan bool + CreateAndGetBulker(outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, error) + GetBulker(outputName string) Bulk + ReadSecrets(ctx context.Context, secretIds []string) (map[string]string, error) } @@ -87,6 +89,7 @@ type Bulker struct { tracer *apm.Tracer remoteOutputConfigMap map[string]map[string]interface{} remoteOutputCh chan bool + bulkerMap map[string]Bulk } const ( @@ -116,7 +119,86 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker tracer: tracer, remoteOutputConfigMap: make(map[string]map[string]interface{}), remoteOutputCh: make(chan bool, 1), + bulkerMap: make(map[string]Bulk), + } +} + +func (b *Bulker) GetBulker(outputName string) Bulk { + return b.bulkerMap[outputName] +} + +func (b *Bulker) CreateAndGetBulker(outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, error) { + b.CheckRemoteOutputChanged(outputName, outputMap[outputName]) + bulker := b.bulkerMap[outputName] + if bulker != nil { + return bulker, nil + } + bulkCtx, bulkCancel := context.WithCancel(context.Background()) + defer bulkCancel() + es, err := b.createRemoteEsClient(bulkCtx, outputName, serviceToken, outputMap) + if err != nil { + return nil, err + } + // starting a new bulker to create/update API keys for remote ES output + newBulker := NewBulker(es, b.tracer) + b.bulkerMap[outputName] = newBulker + + errCh := make(chan error) + go func() { + runFunc := func() (err error) { + return newBulker.Run(bulkCtx) + } + + errCh <- runFunc() + }() + go func() { + select { + case err = <-errCh: + case <-bulkCtx.Done(): + err = bulkCtx.Err() + } + }() + + return newBulker, nil +} + +func (b *Bulker) createRemoteEsClient(ctx context.Context, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (*elasticsearch.Client, error) { + hostsObj := outputMap[outputName]["hosts"] + hosts, ok := hostsObj.([]interface{}) + if !ok { + return nil, fmt.Errorf("failed to get hosts from output: %v", hostsObj) + } + hostsStrings := make([]string, len(hosts)) + for i, host := range hosts { + hostsStrings[i], ok = host.(string) + if !ok { + return nil, fmt.Errorf("failed to get hosts from output: %v", host) + } + } + + cfg := config.Config{ + Output: config.Output{ + Elasticsearch: config.Elasticsearch{ + Hosts: hostsStrings, + ServiceToken: serviceToken, + }, + }, + } + es, err := es.NewClient(ctx, &cfg, false, elasticsearchOptions( + true, build.Info{}, + )...) + if err != nil { + return nil, err + } + return es, nil +} + +func elasticsearchOptions(instumented bool, bi build.Info) []es.ConfigOption { + options := []es.ConfigOption{es.WithUserAgent("Remote-Fleet-Server", bi)} + if instumented { + options = append(options, es.InstrumentRoundTripper()) } + return options } func (b *Bulker) Client() *elasticsearch.Client { diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index edbf33e06..dfbcc808e 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -468,4 +468,6 @@ type ToRetireAPIKeyIdsItems struct { // Date/time the API key was retired RetiredAt string `json:"retired_at,omitempty"` + + Output string `json:"output,omitempty"` } diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 140d62978..134baad44 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -16,15 +16,11 @@ import ( "go.elastic.co/apm/v2" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" - "github.com/elastic/fleet-server/v7/internal/pkg/build" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dl" - "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/smap" - "github.com/elastic/go-elasticsearch/v8" ) const ( @@ -37,8 +33,6 @@ const ( var ( ErrNoOutputPerms = errors.New("output permission sections not found") ErrFailInjectAPIKey = errors.New("fail inject api key") - // TODO clean up bulkers when a remote output is removed - bulkerMap = make(map[string]bulk.Bulk) ) type Output struct { @@ -66,7 +60,7 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B } case OutputTypeRemoteElasticsearch: zlog.Debug().Msg("preparing remote elasticsearch output") - newBulker, err := p.createAndGetBulker(bulker, outputMap) + newBulker, err := bulker.CreateAndGetBulker(p.Name, p.ServiceToken, outputMap) if err != nil { return err } @@ -115,6 +109,61 @@ func (p *Output) prepareElasticsearch( agent.Outputs[p.Name] = output } + // retire api key of removed remote output + var toRetireAPIKeys *model.ToRetireAPIKeyIdsItems + var removedOutputKey string + // find the first output that is removed - supposing one output can be removed at a time + for agentOutputKey, agentOutput := range agent.Outputs { + found := false + for outputMapKey := range outputMap { + if agentOutputKey == outputMapKey { + found = true + } + } + if !found { + zlog.Info().Str("APIKeyID", agentOutput.APIKeyID).Str("output", agentOutputKey).Msg("Output removed, will retire API key") + toRetireAPIKeys = &model.ToRetireAPIKeyIdsItems{ + ID: agentOutput.APIKeyID, + RetiredAt: time.Now().UTC().Format(time.RFC3339), + Output: agentOutputKey, + } + removedOutputKey = agentOutputKey + break + } + } + + if toRetireAPIKeys != nil { + + // adding remote API key to new output toRetireAPIKeys + fields := map[string]interface{}{ + dl.FieldPolicyOutputToRetireAPIKeyIDs: *toRetireAPIKeys, + } + + // Using painless script to append the old keys to the history + body, err := renderUpdatePainlessScript(p.Name, fields) + if err != nil { + return fmt.Errorf("could not update painless script: %w", err) + } + + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return fmt.Errorf("fail update agent record: %w", err) + } + + // remove output from agent doc + body, err = json.Marshal(map[string]interface{}{ + "script": map[string]interface{}{ + "lang": "painless", + "source": fmt.Sprintf("ctx._source['outputs'].remove(\"%s\")", removedOutputKey), + }, + }) + + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return fmt.Errorf("fail update agent record: %w", err) + } + } + // Determine whether we need to generate an output ApiKey. // This is accomplished by comparing the sha2 hash stored in the corresponding // output in the agent record with the precalculated sha2 hash of the role. @@ -227,6 +276,7 @@ func (p *Output) prepareElasticsearch( fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{ ID: output.APIKeyID, RetiredAt: time.Now().UTC().Format(time.RFC3339), + Output: p.Name, } } @@ -272,80 +322,6 @@ func (p *Output) prepareElasticsearch( return nil } -func (p *Output) createAndGetBulker(mainBulker bulk.Bulk, outputMap map[string]map[string]interface{}) (bulk.Bulk, error) { - mainBulker.CheckRemoteOutputChanged(p.Name, outputMap[p.Name]) - bulker := bulkerMap[p.Name] - if bulker != nil { - return bulker, nil - } - bulkCtx, bulkCancel := context.WithCancel(context.Background()) - defer bulkCancel() - es, err := p.createRemoteEsClient(bulkCtx, outputMap) - if err != nil { - return nil, err - } - // starting a new bulker to create/update API keys for remote ES output - newBulker := bulk.NewBulker(es, mainBulker.Tracer()) - bulkerMap[p.Name] = newBulker - - errCh := make(chan error) - go func() { - runFunc := func() (err error) { - return newBulker.Run(bulkCtx) - } - - errCh <- runFunc() - }() - go func() { - select { - case err = <-errCh: - case <-bulkCtx.Done(): - err = bulkCtx.Err() - } - }() - - return newBulker, nil -} - -func (p *Output) createRemoteEsClient(ctx context.Context, outputMap map[string]map[string]interface{}) (*elasticsearch.Client, error) { - hostsObj := outputMap[p.Name]["hosts"] - hosts, ok := hostsObj.([]interface{}) - if !ok { - return nil, fmt.Errorf("failed to get hosts from output: %v", hostsObj) - } - hostsStrings := make([]string, len(hosts)) - for i, host := range hosts { - hostsStrings[i], ok = host.(string) - if !ok { - return nil, fmt.Errorf("failed to get hosts from output: %v", host) - } - } - - cfg := config.Config{ - Output: config.Output{ - Elasticsearch: config.Elasticsearch{ - Hosts: hostsStrings, - ServiceToken: p.ServiceToken, - }, - }, - } - es, err := es.NewClient(ctx, &cfg, false, elasticsearchOptions( - true, build.Info{}, - )...) - if err != nil { - return nil, err - } - return es, nil -} - -func elasticsearchOptions(instumented bool, bi build.Info) []es.ConfigOption { - options := []es.ConfigOption{es.WithUserAgent("Remote-Fleet-Server", bi)} - if instumented { - options = append(options, es.InstrumentRoundTripper()) - } - return options -} - func fetchAPIKeyRoles(ctx context.Context, b bulk.Bulk, apiKeyID string) (*RoleT, error) { res, err := b.APIKeyRead(ctx, apiKeyID, true) if err != nil { diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index 8fc9b28e7..ed22568a0 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -193,6 +193,7 @@ func (m *selfMonitorT) groupByLatest(policies []model.Policy) map[string]model.P return groupByLatest(policies) } +// UnitStateDegraded / Failed func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error) { m.mut.Lock() defer m.mut.Unlock() diff --git a/internal/pkg/testing/bulk.go b/internal/pkg/testing/bulk.go index afe91099c..37fe69bfa 100644 --- a/internal/pkg/testing/bulk.go +++ b/internal/pkg/testing/bulk.go @@ -79,8 +79,14 @@ func (m *MockBulk) Client() *elasticsearch.Client { return args.Get(0).(*elasticsearch.Client) } -func (m *MockBulk) Tracer() *apm.Tracer { - return nil +func (m *MockBulk) GetBulker(outputName string) bulk.Bulk { + args := m.Called() + return args.Get(0).(bulk.Bulk) +} + +func (m *MockBulk) CreateAndGetBulker(outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (bulk.Bulk, error) { + args := m.Called(outputName, serviceToken, outputMap) + return args.Get(0).(bulk.Bulk), args.Error(1) } func (m *MockBulk) CheckRemoteOutputChanged(name string, newCfg map[string]interface{}) {