Skip to content

Commit

Permalink
invalidate api key if remote output is removed
Browse files Browse the repository at this point in the history
  • Loading branch information
juliaElastic committed Nov 7, 2023
1 parent 6dc08cd commit 9a0e8b1
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 87 deletions.
9 changes: 8 additions & 1 deletion internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,16 +556,23 @@ func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) {

func (ack *AckT) invalidateAPIKeys(ctx context.Context, zlog zerolog.Logger, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, skip string) {
ids := make([]string, 0, len(toRetireAPIKeyIDs))
output := ""
for _, k := range toRetireAPIKeyIDs {
if k.ID == skip || k.ID == "" {
continue
}
ids = append(ids, k.ID)
output = k.Output
}
// using remote es bulker to invalidate api key - supposing all retire api keky ids belong to the same remote es
bulk := ack.bulk
if output != "" {
bulk = ack.bulk.GetBulker(output)
}

if len(ids) > 0 {
zlog.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
if err := bulk.APIKeyInvalidate(ctx, ids...); err != nil {
zlog.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}
Expand Down
88 changes: 85 additions & 3 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
"github.com/elastic/fleet-server/v7/internal/pkg/build"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/es"

"github.com/elastic/go-elasticsearch/v8"
Expand Down Expand Up @@ -66,13 +68,13 @@ type Bulk interface {
// Accessor used to talk to elastic search direcly bypassing bulk engine
Client() *elasticsearch.Client

// Reusing tracer to create bulker for remote ES outputs
Tracer() *apm.Tracer

CheckRemoteOutputChanged(name string, newCfg map[string]interface{})

RemoteOutputCh() chan bool

CreateAndGetBulker(outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, error)
GetBulker(outputName string) Bulk

ReadSecrets(ctx context.Context, secretIds []string) (map[string]string, error)
}

Expand All @@ -87,6 +89,7 @@ type Bulker struct {
tracer *apm.Tracer
remoteOutputConfigMap map[string]map[string]interface{}
remoteOutputCh chan bool
bulkerMap map[string]Bulk
}

const (
Expand Down Expand Up @@ -116,7 +119,86 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker
tracer: tracer,
remoteOutputConfigMap: make(map[string]map[string]interface{}),
remoteOutputCh: make(chan bool, 1),
bulkerMap: make(map[string]Bulk),
}
}

func (b *Bulker) GetBulker(outputName string) Bulk {
return b.bulkerMap[outputName]
}

func (b *Bulker) CreateAndGetBulker(outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (Bulk, error) {
b.CheckRemoteOutputChanged(outputName, outputMap[outputName])
bulker := b.bulkerMap[outputName]
if bulker != nil {
return bulker, nil
}
bulkCtx, bulkCancel := context.WithCancel(context.Background())
defer bulkCancel()
es, err := b.createRemoteEsClient(bulkCtx, outputName, serviceToken, outputMap)
if err != nil {
return nil, err
}
// starting a new bulker to create/update API keys for remote ES output
newBulker := NewBulker(es, b.tracer)
b.bulkerMap[outputName] = newBulker

errCh := make(chan error)
go func() {
runFunc := func() (err error) {
return newBulker.Run(bulkCtx)
}

errCh <- runFunc()
}()
go func() {
select {
case err = <-errCh:
case <-bulkCtx.Done():
err = bulkCtx.Err()
}
}()

return newBulker, nil
}

func (b *Bulker) createRemoteEsClient(ctx context.Context, outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (*elasticsearch.Client, error) {
hostsObj := outputMap[outputName]["hosts"]
hosts, ok := hostsObj.([]interface{})
if !ok {
return nil, fmt.Errorf("failed to get hosts from output: %v", hostsObj)
}
hostsStrings := make([]string, len(hosts))
for i, host := range hosts {
hostsStrings[i], ok = host.(string)
if !ok {
return nil, fmt.Errorf("failed to get hosts from output: %v", host)
}
}

cfg := config.Config{
Output: config.Output{
Elasticsearch: config.Elasticsearch{
Hosts: hostsStrings,
ServiceToken: serviceToken,
},
},
}
es, err := es.NewClient(ctx, &cfg, false, elasticsearchOptions(
true, build.Info{},
)...)
if err != nil {
return nil, err
}
return es, nil
}

func elasticsearchOptions(instumented bool, bi build.Info) []es.ConfigOption {
options := []es.ConfigOption{es.WithUserAgent("Remote-Fleet-Server", bi)}
if instumented {
options = append(options, es.InstrumentRoundTripper())
}
return options
}

func (b *Bulker) Client() *elasticsearch.Client {
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

138 changes: 57 additions & 81 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@ import (
"go.elastic.co/apm/v2"

"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
"github.com/elastic/fleet-server/v7/internal/pkg/build"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
"github.com/elastic/go-elasticsearch/v8"
)

const (
Expand All @@ -37,8 +33,6 @@ const (
var (
ErrNoOutputPerms = errors.New("output permission sections not found")
ErrFailInjectAPIKey = errors.New("fail inject api key")
// TODO clean up bulkers when a remote output is removed
bulkerMap = make(map[string]bulk.Bulk)
)

type Output struct {
Expand Down Expand Up @@ -66,7 +60,7 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B
}
case OutputTypeRemoteElasticsearch:
zlog.Debug().Msg("preparing remote elasticsearch output")
newBulker, err := p.createAndGetBulker(bulker, outputMap)
newBulker, err := bulker.CreateAndGetBulker(p.Name, p.ServiceToken, outputMap)
if err != nil {
return err
}
Expand Down Expand Up @@ -115,6 +109,61 @@ func (p *Output) prepareElasticsearch(
agent.Outputs[p.Name] = output
}

// retire api key of removed remote output
var toRetireAPIKeys *model.ToRetireAPIKeyIdsItems
var removedOutputKey string
// find the first output that is removed - supposing one output can be removed at a time
for agentOutputKey, agentOutput := range agent.Outputs {
found := false
for outputMapKey := range outputMap {
if agentOutputKey == outputMapKey {
found = true
}
}
if !found {
zlog.Info().Str("APIKeyID", agentOutput.APIKeyID).Str("output", agentOutputKey).Msg("Output removed, will retire API key")
toRetireAPIKeys = &model.ToRetireAPIKeyIdsItems{
ID: agentOutput.APIKeyID,
RetiredAt: time.Now().UTC().Format(time.RFC3339),
Output: agentOutputKey,
}
removedOutputKey = agentOutputKey
break
}
}

if toRetireAPIKeys != nil {

// adding remote API key to new output toRetireAPIKeys
fields := map[string]interface{}{
dl.FieldPolicyOutputToRetireAPIKeyIDs: *toRetireAPIKeys,
}

// Using painless script to append the old keys to the history
body, err := renderUpdatePainlessScript(p.Name, fields)
if err != nil {
return fmt.Errorf("could not update painless script: %w", err)
}

if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
zlog.Error().Err(err).Msg("fail update agent record")
return fmt.Errorf("fail update agent record: %w", err)
}

// remove output from agent doc
body, err = json.Marshal(map[string]interface{}{

Check failure on line 154 in internal/pkg/policy/policy_output.go

View workflow job for this annotation

GitHub Actions / lint (linux)

ineffectual assignment to err (ineffassign)
"script": map[string]interface{}{
"lang": "painless",
"source": fmt.Sprintf("ctx._source['outputs'].remove(\"%s\")", removedOutputKey),
},
})

if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
zlog.Error().Err(err).Msg("fail update agent record")
return fmt.Errorf("fail update agent record: %w", err)
}
}

// Determine whether we need to generate an output ApiKey.
// This is accomplished by comparing the sha2 hash stored in the corresponding
// output in the agent record with the precalculated sha2 hash of the role.
Expand Down Expand Up @@ -227,6 +276,7 @@ func (p *Output) prepareElasticsearch(
fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{
ID: output.APIKeyID,
RetiredAt: time.Now().UTC().Format(time.RFC3339),
Output: p.Name,
}
}

Expand Down Expand Up @@ -272,80 +322,6 @@ func (p *Output) prepareElasticsearch(
return nil
}

func (p *Output) createAndGetBulker(mainBulker bulk.Bulk, outputMap map[string]map[string]interface{}) (bulk.Bulk, error) {
mainBulker.CheckRemoteOutputChanged(p.Name, outputMap[p.Name])
bulker := bulkerMap[p.Name]
if bulker != nil {
return bulker, nil
}
bulkCtx, bulkCancel := context.WithCancel(context.Background())
defer bulkCancel()
es, err := p.createRemoteEsClient(bulkCtx, outputMap)
if err != nil {
return nil, err
}
// starting a new bulker to create/update API keys for remote ES output
newBulker := bulk.NewBulker(es, mainBulker.Tracer())
bulkerMap[p.Name] = newBulker

errCh := make(chan error)
go func() {
runFunc := func() (err error) {
return newBulker.Run(bulkCtx)
}

errCh <- runFunc()
}()
go func() {
select {
case err = <-errCh:
case <-bulkCtx.Done():
err = bulkCtx.Err()
}
}()

return newBulker, nil
}

func (p *Output) createRemoteEsClient(ctx context.Context, outputMap map[string]map[string]interface{}) (*elasticsearch.Client, error) {
hostsObj := outputMap[p.Name]["hosts"]
hosts, ok := hostsObj.([]interface{})
if !ok {
return nil, fmt.Errorf("failed to get hosts from output: %v", hostsObj)
}
hostsStrings := make([]string, len(hosts))
for i, host := range hosts {
hostsStrings[i], ok = host.(string)
if !ok {
return nil, fmt.Errorf("failed to get hosts from output: %v", host)
}
}

cfg := config.Config{
Output: config.Output{
Elasticsearch: config.Elasticsearch{
Hosts: hostsStrings,
ServiceToken: p.ServiceToken,
},
},
}
es, err := es.NewClient(ctx, &cfg, false, elasticsearchOptions(
true, build.Info{},
)...)
if err != nil {
return nil, err
}
return es, nil
}

func elasticsearchOptions(instumented bool, bi build.Info) []es.ConfigOption {
options := []es.ConfigOption{es.WithUserAgent("Remote-Fleet-Server", bi)}
if instumented {
options = append(options, es.InstrumentRoundTripper())
}
return options
}

func fetchAPIKeyRoles(ctx context.Context, b bulk.Bulk, apiKeyID string) (*RoleT, error) {
res, err := b.APIKeyRead(ctx, apiKeyID, true)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/policy/self.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (m *selfMonitorT) groupByLatest(policies []model.Policy) map[string]model.P
return groupByLatest(policies)
}

// UnitStateDegraded / Failed
func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error) {
m.mut.Lock()
defer m.mut.Unlock()
Expand Down
10 changes: 8 additions & 2 deletions internal/pkg/testing/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,14 @@ func (m *MockBulk) Client() *elasticsearch.Client {
return args.Get(0).(*elasticsearch.Client)
}

func (m *MockBulk) Tracer() *apm.Tracer {
return nil
func (m *MockBulk) GetBulker(outputName string) bulk.Bulk {
args := m.Called()
return args.Get(0).(bulk.Bulk)
}

func (m *MockBulk) CreateAndGetBulker(outputName string, serviceToken string, outputMap map[string]map[string]interface{}) (bulk.Bulk, error) {
args := m.Called(outputName, serviceToken, outputMap)
return args.Get(0).(bulk.Bulk), args.Error(1)
}

func (m *MockBulk) CheckRemoteOutputChanged(name string, newCfg map[string]interface{}) {
Expand Down

0 comments on commit 9a0e8b1

Please sign in to comment.